Skip to content

Commit

Permalink
revert to using PubSub and killing socket in RT channel
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Dec 11, 2024
1 parent 95d9f9b commit f4c18e8
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 29 deletions.
6 changes: 1 addition & 5 deletions lib/realtime/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ defmodule Realtime.Api do
alias Realtime.GenCounter
alias Realtime.Tenants

alias RealtimeWeb.UserSocket

@doc """
Returns the list of tenants.
Expand Down Expand Up @@ -126,9 +124,7 @@ defmodule Realtime.Api do
data: %{external_id: external_id}
})
when is_map_key(changes, :jwt_jwks) or is_map_key(changes, :jwt_secret) do
external_id
|> UserSocket.subscribers_id()
|> RealtimeWeb.Endpoint.broadcast("disconnect", %{})
Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations:" <> external_id, :disconnect)
end

defp maybe_trigger_disconnect(_), do: nil
Expand Down
9 changes: 9 additions & 0 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,15 @@ defmodule RealtimeWeb.RealtimeChannel do
end

@impl true
def handle_info(:disconnect, socket) do
%{assigns: %{channel_name: channel_name, tenant_topic: tenant_topic}} = socket
Logger.info("Received operational call to disconnect channel")
push_system_message("system", socket, "ok", "Server requested disconnect", channel_name)
RealtimeWeb.Endpoint.broadcast(tenant_topic, "disconnect", %{})

{:stop, :shutdown, socket}
end

def handle_info(msg, socket) do
log_error("UnhandledSystemMessage", msg)
{:noreply, socket}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
createClient,
SupabaseClient,
RealtimeChannel,
} from "npm:@supabase/supabase-js@2.47.3";
} from "npm:@supabase/supabase-js@latest";
import {
assert,
assertEquals,
Expand Down
40 changes: 33 additions & 7 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -885,11 +885,19 @@ defmodule Realtime.Integration.RtChannelTest do

assert_receive %Phoenix.Socket.Message{event: "phx_reply"}, 500
assert_receive %Phoenix.Socket.Message{event: "presence_state"}, 500

tenant = Tenants.get_tenant_by_external_id(@external_id)
Realtime.Api.update_tenant(tenant, %{jwt_jwks: %{keys: ["potato"]}})
:timer.sleep(5000)
refute Process.alive?(socket)

assert_receive %Phoenix.Socket.Message{
topic: ^realtime_topic,
event: "system",
payload: %{
"extension" => "system",
"message" => "Server requested disconnect",
"status" => "ok"
}
},
500
end

test "on jwt_secret the socket closes and sends a system message", %{topic: topic} do
Expand All @@ -904,8 +912,17 @@ defmodule Realtime.Integration.RtChannelTest do

tenant = Tenants.get_tenant_by_external_id(@external_id)
Realtime.Api.update_tenant(tenant, %{jwt_secret: "potato"})
:timer.sleep(5000)
refute Process.alive?(socket)

assert_receive %Phoenix.Socket.Message{
topic: ^realtime_topic,
event: "system",
payload: %{
"extension" => "system",
"message" => "Server requested disconnect",
"status" => "ok"
}
},
500
end

test "on other param changes the socket won't close and no message is sent", %{topic: topic} do
Expand All @@ -920,8 +937,17 @@ defmodule Realtime.Integration.RtChannelTest do

tenant = Tenants.get_tenant_by_external_id(@external_id)
Realtime.Api.update_tenant(tenant, %{max_concurrent_users: 100})
:timer.sleep(5000)
assert Process.alive?(socket)

refute_receive %Phoenix.Socket.Message{
topic: ^realtime_topic,
event: "system",
payload: %{
"extension" => "system",
"message" => "Server requested disconnect",
"status" => "ok"
}
},
500
end
end

Expand Down
20 changes: 4 additions & 16 deletions test/realtime/api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule Realtime.ApiTest do

Enum.each(tenants, fn tenant ->
:ok =
RealtimeWeb.Endpoint.subscribe("user_socket:" <> tenant.external_id)
Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id)
end)

%{tenants: tenants}
Expand Down Expand Up @@ -128,34 +128,22 @@ defmodule Realtime.ApiTest do
tenants: [tenant | _]
} do
assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_jwks: %{keys: ["test"]}})

assert_receive %Phoenix.Socket.Broadcast{
topic: "user_socket:external_id1",
event: "disconnect"
}
assert_receive :disconnect
end

test "update_tenant/2 with valid data and jwt_secret change will send disconnect event", %{
tenants: [tenant | _]
} do
assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_secret: "potato"})

assert_receive %Phoenix.Socket.Broadcast{
topic: "user_socket:external_id1",
event: "disconnect"
}
assert_receive :disconnect
end

test "update_tenant/2 with valid data but not updating jwt_secret or jwt_jwks won't send event",
%{
tenants: [tenant | _]
} do
assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{max_events_per_second: 100})

refute_receive %Phoenix.Socket.Broadcast{
topic: "user_socket:external_id1",
event: "disconnect"
}
refute_receive :disconnect
end

test "delete_tenant/1 deletes the tenant", %{tenants: [tenant | _]} do
Expand Down

0 comments on commit f4c18e8

Please sign in to comment.