Skip to content

Commit

Permalink
fix: On tenant delete, remove replication slots (#781)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco authored Jan 22, 2024
1 parent 11c512a commit 387e25b
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 10 deletions.
17 changes: 17 additions & 0 deletions lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,23 @@ defmodule Realtime.Helpers do
end
end

def replication_slot_teardown(tenant) do
{:ok, conn} = check_tenant_connection(tenant, "replication_slot_teardown")

with {:ok, %{rows: rows}} <-
Postgrex.query(
conn,
"select active_pid from pg_replication_slots where slot_name ilike '%realtime%'",
[]
) do
Enum.each(rows, fn [pid] ->
Postgrex.query(conn, "select pg_terminate_backend(#{pid})", [])
end)

:ok
end
end

defp stop_user_tenant_process(tenant, platform_region, acc) do
Extensions.PostgresCdcRls.handle_stop(tenant, 5_000)
# credo:disable-for-next-line
Expand Down
26 changes: 18 additions & 8 deletions lib/realtime_web/controllers/tenant_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ defmodule RealtimeWeb.TenantController do

alias Realtime.Api
alias Realtime.Api.Tenant
alias Realtime.Tenants
alias Realtime.Helpers
alias Realtime.PostgresCdc
alias RealtimeWeb.{Endpoint, UserSocket}
alias Realtime.Tenants
alias RealtimeWeb.Endpoint
alias RealtimeWeb.UserSocket

alias RealtimeWeb.OpenApiSchemas.{
EmptyResponse,
Expand Down Expand Up @@ -178,13 +180,21 @@ defmodule RealtimeWeb.TenantController do
def delete(conn, %{"tenant_id" => id}) do
Logger.metadata(external_id: id, project: id)

case Api.delete_tenant_by_external_id(id) do
true ->
id |> UserSocket.subscribers_id() |> Endpoint.broadcast("disconnect", %{})
Task.async(fn -> PostgresCdc.stop_all(id) end)

false ->
case Api.get_tenant_by_external_id(id) do
nil ->
Logger.error("Tenant #{id} does not exist")

tenant ->
id
|> UserSocket.subscribers_id()
|> Endpoint.broadcast("disconnect", %{})

Task.async(fn ->
PostgresCdc.stop_all(id)
Helpers.replication_slot_teardown(tenant)
end)

Api.delete_tenant_by_external_id(id)
end

send_resp(conn, 204, "")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.25.59",
version: "2.25.60",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
29 changes: 28 additions & 1 deletion test/realtime/helpers_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
defmodule Realtime.HelpersTest do
use Realtime.DataCase, async: true
use Realtime.DataCase, async: false
# async: false due to the deletion of the replication slot potentially affecting other tests
doctest Realtime.Helpers
alias Realtime.Helpers

describe "replication_slot_teardown/1" do
setup do
%{tenant: tenant_fixture()}
end

test "removes replication slots with the realtime prefix", %{tenant: tenant} do
[extension] = tenant.extensions
args = Map.put(extension.settings, "id", random_string())
{:ok, pid} = start_supervised({Extensions.PostgresCdcStream.Replication, args})

{:ok, conn} = Helpers.check_tenant_connection(tenant, "realtime_test")
# Check replication slot was created
assert %{rows: [["supabase_realtime_replication_slot"]]} =
Postgrex.query!(conn, "SELECT slot_name FROM pg_replication_slots", [])

# Kill connections to database
Process.exit(pid, :normal)
Process.exit(conn, :normal)

Helpers.replication_slot_teardown(tenant)

assert %{rows: []} = Postgrex.query!(conn, "SELECT slot_name FROM pg_replication_slots", [])
end
end
end
8 changes: 8 additions & 0 deletions test/realtime_web/controllers/tenant_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,17 @@ defmodule RealtimeWeb.TenantControllerTest do
describe "delete tenant" do
setup [:create_tenant]

setup %{tenant: tenant} do
[extension] = tenant.extensions
args = Map.put(extension.settings, "id", random_string())
start_supervised!({Extensions.PostgresCdcStream.Replication, args})
:ok
end

test "deletes chosen tenant", %{conn: conn, tenant: tenant} do
with_mock JwtVerification, verify: fn _token, _secret -> {:ok, %{}} end do
conn = delete(conn, Routes.tenant_path(conn, :delete, tenant.external_id))
:timer.sleep(10000)
assert response(conn, 204)
conn = get(conn, Routes.tenant_path(conn, :show, tenant.external_id))
assert response(conn, 404)
Expand Down

0 comments on commit 387e25b

Please sign in to comment.