diff --git a/lib/sequin/postgres/replication_connection.ex b/lib/sequin/postgres/replication_connection.ex index 246f795a5..549fa48c9 100644 --- a/lib/sequin/postgres/replication_connection.ex +++ b/lib/sequin/postgres/replication_connection.ex @@ -150,7 +150,11 @@ defmodule Sequin.Postgres.ReplicationConnection do defstruct protocol: nil, state: nil, reconnect_backoff: 500, - streaming: nil + streaming: nil, + # indicates if socket is paused (active:false) + paused: false, + # queued tcp/ssl messages while paused + pause_buffer: [] ## PUBLIC API ## @@ -224,6 +228,9 @@ defmodule Sequin.Postgres.ReplicationConnection do | {:query, query, state} | {:stream, query, stream_opts, state} | {:disconnect, reason, state} + # back-pressure controls + | {:pause, state} + | {:resume, state} @doc """ Callback for `Kernel.send/2`. @@ -427,6 +434,28 @@ defmodule Sequin.Postgres.ReplicationConnection do end end + @doc """ + Temporarily stop receiving further replication messages. For debugging/intervention purposes. + + This sets the underlying socket to `active: false` which propagates TCP back‑pressure + to PostgreSQL. Pending messages that have already been delivered to the process + mailbox are stored internally and will be processed once `resume/1` is called. + + You MUST manually call `resume/1` after calling `pause/1`. + """ + @spec pause(server) :: :ok + def pause(server) do + :gen_statem.cast(server, :pause) + end + + @doc """ + Resume receiving replication messages after a previous `pause/1`. + """ + @spec resume(server) :: :ok + def resume(server) do + :gen_statem.cast(server, :resume) + end + ## CALLBACKS ## @state :no_state @@ -512,6 +541,44 @@ defmodule Sequin.Postgres.ReplicationConnection do {:keep_state, s, {:next_event, :internal, {:connect, :reconnect}}} end + # For debugging or intervention purposes + def handle_event(:cast, :pause, @state, %{paused: true} = s) do + # Already paused – nothing to do + {:keep_state, s} + end + + def handle_event(:cast, :pause, @state, %{protocol: proto} = s) do + case Protocol.checkout(proto) do + {:ok, proto} -> + {:keep_state, %{s | protocol: proto, paused: true}} + + {:disconnect, reason, proto} -> + reconnect_or_stop(:disconnect, reason, proto, s) + end + end + + def handle_event(:cast, :resume, @state, %{paused: false} = s) do + # Not paused – nothing to do + {:keep_state, s} + end + + def handle_event(:cast, :resume, @state, %{protocol: proto, pause_buffer: buf} = s) do + case Protocol.checkin(proto) do + {:ok, proto} -> + # Re‑arm socket and replay buffered packets + for packet <- Enum.reverse(buf), do: send(self(), packet) + + {:keep_state, %{s | protocol: proto, paused: false, pause_buffer: []}} + + {:disconnect, reason, proto} -> + reconnect_or_stop(:disconnect, reason, proto, s) + end + end + + def handle_event(:info, msg, @state, %{paused: true, pause_buffer: buf} = s) do + {:keep_state, %{s | pause_buffer: [msg | buf]}} + end + ## Helpers defp handle_data([], s), do: {:keep_state, s} @@ -588,6 +655,32 @@ defmodule Sequin.Postgres.ReplicationConnection do {:stop, reason, mod_state} -> {:stop, reason, %{s | state: {mod, mod_state}}} + + {:pause, mod_state} when not s.paused -> + s = %{s | state: {mod, mod_state}} + + case Protocol.checkout(s.protocol) do + {:ok, proto} -> + {:keep_state, %{s | protocol: proto, paused: true}} + + {:disconnect, reason, proto} -> + reconnect_or_stop(:disconnect, reason, proto, s) + end + + {:resume, mod_state} when s.paused -> + s = %{s | state: {mod, mod_state}} + + case Protocol.checkin(s.protocol) do + {:ok, proto} -> + s.pause_buffer + |> Enum.reverse() + |> Enum.each(&send(self(), &1)) + + {:keep_state, %{s | protocol: proto, paused: false, pause_buffer: []}} + + {:disconnect, reason, proto} -> + reconnect_or_stop(:disconnect, reason, proto, s) + end end end