Skip to content

[WIP] back-pressure via tcp active: false in ReplicationConnection #1281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 94 additions & 1 deletion lib/sequin/postgres/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ defmodule Sequin.Postgres.ReplicationConnection do
defstruct protocol: nil,
state: nil,
reconnect_backoff: 500,
streaming: nil
streaming: nil,
# indicates if socket is paused (active:false)
paused: false,
# queued tcp/ssl messages while paused
pause_buffer: []

## PUBLIC API ##

Expand Down Expand Up @@ -224,6 +228,9 @@ defmodule Sequin.Postgres.ReplicationConnection do
| {:query, query, state}
| {:stream, query, stream_opts, state}
| {:disconnect, reason, state}
# back-pressure controls
| {:pause, state}
| {:resume, state}

@doc """
Callback for `Kernel.send/2`.
Expand Down Expand Up @@ -427,6 +434,28 @@ defmodule Sequin.Postgres.ReplicationConnection do
end
end

@doc """
Temporarily stop receiving further replication messages. For debugging/intervention purposes.

This sets the underlying socket to `active: false` which propagates TCP back‑pressure
to PostgreSQL. Pending messages that have already been delivered to the process
mailbox are stored internally and will be processed once `resume/1` is called.

You MUST manually call `resume/1` after calling `pause/1`.
"""
@spec pause(server) :: :ok
def pause(server) do
:gen_statem.cast(server, :pause)
end

@doc """
Resume receiving replication messages after a previous `pause/1`.
"""
@spec resume(server) :: :ok
def resume(server) do
:gen_statem.cast(server, :resume)
end

## CALLBACKS ##

@state :no_state
Expand Down Expand Up @@ -512,6 +541,44 @@ defmodule Sequin.Postgres.ReplicationConnection do
{:keep_state, s, {:next_event, :internal, {:connect, :reconnect}}}
end

# For debugging or intervention purposes
def handle_event(:cast, :pause, @state, %{paused: true} = s) do
# Already paused – nothing to do
{:keep_state, s}
end

def handle_event(:cast, :pause, @state, %{protocol: proto} = s) do
case Protocol.checkout(proto) do
{:ok, proto} ->
{:keep_state, %{s | protocol: proto, paused: true}}

{:disconnect, reason, proto} ->
reconnect_or_stop(:disconnect, reason, proto, s)
end
end

def handle_event(:cast, :resume, @state, %{paused: false} = s) do
# Not paused – nothing to do
{:keep_state, s}
end

def handle_event(:cast, :resume, @state, %{protocol: proto, pause_buffer: buf} = s) do
case Protocol.checkin(proto) do
{:ok, proto} ->
# Re‑arm socket and replay buffered packets
for packet <- Enum.reverse(buf), do: send(self(), packet)

{:keep_state, %{s | protocol: proto, paused: false, pause_buffer: []}}

{:disconnect, reason, proto} ->
reconnect_or_stop(:disconnect, reason, proto, s)
end
end

def handle_event(:info, msg, @state, %{paused: true, pause_buffer: buf} = s) do
{:keep_state, %{s | pause_buffer: [msg | buf]}}
end

## Helpers

defp handle_data([], s), do: {:keep_state, s}
Expand Down Expand Up @@ -588,6 +655,32 @@ defmodule Sequin.Postgres.ReplicationConnection do

{:stop, reason, mod_state} ->
{:stop, reason, %{s | state: {mod, mod_state}}}

{:pause, mod_state} when not s.paused ->
s = %{s | state: {mod, mod_state}}

case Protocol.checkout(s.protocol) do
{:ok, proto} ->
{:keep_state, %{s | protocol: proto, paused: true}}

{:disconnect, reason, proto} ->
reconnect_or_stop(:disconnect, reason, proto, s)
end

{:resume, mod_state} when s.paused ->
s = %{s | state: {mod, mod_state}}

case Protocol.checkin(s.protocol) do
{:ok, proto} ->
s.pause_buffer
|> Enum.reverse()
|> Enum.each(&send(self(), &1))

{:keep_state, %{s | protocol: proto, paused: false, pause_buffer: []}}

{:disconnect, reason, proto} ->
reconnect_or_stop(:disconnect, reason, proto, s)
end
end
end

Expand Down