From f4c18e85d10d2ce1409b5ddae8b9b82ac444339e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Wed, 11 Dec 2024 16:33:10 +0000 Subject: [PATCH] revert to using PubSub and killing socket in RT channel --- lib/realtime/api.ex | 6 +-- lib/realtime_web/channels/realtime_channel.ex | 9 +++++ test/e2e/tests.ts | 2 +- test/integration/rt_channel_test.exs | 40 +++++++++++++++---- test/realtime/api_test.exs | 20 ++-------- 5 files changed, 48 insertions(+), 29 deletions(-) diff --git a/lib/realtime/api.ex b/lib/realtime/api.ex index 22f1fa53a..84e26ba09 100644 --- a/lib/realtime/api.ex +++ b/lib/realtime/api.ex @@ -14,8 +14,6 @@ defmodule Realtime.Api do alias Realtime.GenCounter alias Realtime.Tenants - alias RealtimeWeb.UserSocket - @doc """ Returns the list of tenants. @@ -126,9 +124,7 @@ defmodule Realtime.Api do data: %{external_id: external_id} }) when is_map_key(changes, :jwt_jwks) or is_map_key(changes, :jwt_secret) do - external_id - |> UserSocket.subscribers_id() - |> RealtimeWeb.Endpoint.broadcast("disconnect", %{}) + Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations:" <> external_id, :disconnect) end defp maybe_trigger_disconnect(_), do: nil diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 41649d051..cb7ba808e 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -318,6 +318,15 @@ defmodule RealtimeWeb.RealtimeChannel do end @impl true + def handle_info(:disconnect, socket) do + %{assigns: %{channel_name: channel_name, tenant_topic: tenant_topic}} = socket + Logger.info("Received operational call to disconnect channel") + push_system_message("system", socket, "ok", "Server requested disconnect", channel_name) + RealtimeWeb.Endpoint.broadcast(tenant_topic, "disconnect", %{}) + + {:stop, :shutdown, socket} + end + def handle_info(msg, socket) do log_error("UnhandledSystemMessage", msg) {:noreply, socket} diff --git a/test/e2e/tests.ts b/test/e2e/tests.ts index 71fb52de2..3bf52692a 100644 --- a/test/e2e/tests.ts +++ b/test/e2e/tests.ts @@ -3,7 +3,7 @@ import { createClient, SupabaseClient, RealtimeChannel, -} from "npm:@supabase/supabase-js@2.47.3"; +} from "npm:@supabase/supabase-js@latest"; import { assert, assertEquals, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index cfad462b7..586b2b86b 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -885,11 +885,19 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Phoenix.Socket.Message{event: "phx_reply"}, 500 assert_receive %Phoenix.Socket.Message{event: "presence_state"}, 500 - tenant = Tenants.get_tenant_by_external_id(@external_id) Realtime.Api.update_tenant(tenant, %{jwt_jwks: %{keys: ["potato"]}}) - :timer.sleep(5000) - refute Process.alive?(socket) + + assert_receive %Phoenix.Socket.Message{ + topic: ^realtime_topic, + event: "system", + payload: %{ + "extension" => "system", + "message" => "Server requested disconnect", + "status" => "ok" + } + }, + 500 end test "on jwt_secret the socket closes and sends a system message", %{topic: topic} do @@ -904,8 +912,17 @@ defmodule Realtime.Integration.RtChannelTest do tenant = Tenants.get_tenant_by_external_id(@external_id) Realtime.Api.update_tenant(tenant, %{jwt_secret: "potato"}) - :timer.sleep(5000) - refute Process.alive?(socket) + + assert_receive %Phoenix.Socket.Message{ + topic: ^realtime_topic, + event: "system", + payload: %{ + "extension" => "system", + "message" => "Server requested disconnect", + "status" => "ok" + } + }, + 500 end test "on other param changes the socket won't close and no message is sent", %{topic: topic} do @@ -920,8 +937,17 @@ defmodule Realtime.Integration.RtChannelTest do tenant = Tenants.get_tenant_by_external_id(@external_id) Realtime.Api.update_tenant(tenant, %{max_concurrent_users: 100}) - :timer.sleep(5000) - assert Process.alive?(socket) + + refute_receive %Phoenix.Socket.Message{ + topic: ^realtime_topic, + event: "system", + payload: %{ + "extension" => "system", + "message" => "Server requested disconnect", + "status" => "ok" + } + }, + 500 end end diff --git a/test/realtime/api_test.exs b/test/realtime/api_test.exs index 605c993f8..0ce268acd 100644 --- a/test/realtime/api_test.exs +++ b/test/realtime/api_test.exs @@ -61,7 +61,7 @@ defmodule Realtime.ApiTest do Enum.each(tenants, fn tenant -> :ok = - RealtimeWeb.Endpoint.subscribe("user_socket:" <> tenant.external_id) + Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id) end) %{tenants: tenants} @@ -128,22 +128,14 @@ defmodule Realtime.ApiTest do tenants: [tenant | _] } do assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_jwks: %{keys: ["test"]}}) - - assert_receive %Phoenix.Socket.Broadcast{ - topic: "user_socket:external_id1", - event: "disconnect" - } + assert_receive :disconnect end test "update_tenant/2 with valid data and jwt_secret change will send disconnect event", %{ tenants: [tenant | _] } do assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_secret: "potato"}) - - assert_receive %Phoenix.Socket.Broadcast{ - topic: "user_socket:external_id1", - event: "disconnect" - } + assert_receive :disconnect end test "update_tenant/2 with valid data but not updating jwt_secret or jwt_jwks won't send event", @@ -151,11 +143,7 @@ defmodule Realtime.ApiTest do tenants: [tenant | _] } do assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{max_events_per_second: 100}) - - refute_receive %Phoenix.Socket.Broadcast{ - topic: "user_socket:external_id1", - event: "disconnect" - } + refute_receive :disconnect end test "delete_tenant/1 deletes the tenant", %{tenants: [tenant | _]} do