Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions lib/phoenix/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ defmodule Phoenix.PubSub do
Registry.unregister(pubsub, topic)
end

@doc """
Unsubscribes the caller from the PubSub adapter's topic taking the metadata into consideration.

Unlike `unsubscribe/2`, this function matches on the metadata provided as an option when subscribed.
This is useful when you have multiple subscriptions for the same topic with different metadata.

## Example

iex> PubSub.subscribe_match(:my_pubsub, "users:123", metadata: :fast)
:ok
iex> PubSub.subscribe_match(:my_pubsub, "users:123", metadata: :slow)
:ok
iex> PubSub.unsubscribe_match(:my_pubsub, "users:123", :fast)
:ok
# Only the :fast subscription is removed, :slow remains active

"""
@spec unsubscribe_match(t, topic, term) :: :ok
def unsubscribe_match(pubsub, topic, metadata) when is_atom(pubsub) and is_binary(topic) do
Registry.unregister_match(pubsub, topic, metadata)
end

@doc """
Broadcasts message on given topic across the whole cluster.

Expand Down Expand Up @@ -162,7 +184,7 @@ defmodule Phoenix.PubSub do

The default dispatcher will broadcast the message to all subscribers except for the
process that initiated the broadcast.

A custom dispatcher may also be given as a fifth, optional argument.
See the "Custom dispatching" section in the module documentation.
"""
Expand Down Expand Up @@ -202,7 +224,7 @@ defmodule Phoenix.PubSub do

The default dispatcher will broadcast the message to all subscribers except for the
process that initiated the broadcast.

A custom dispatcher may also be given as a fifth, optional argument.
See the "Custom dispatching" section in the module documentation.
"""
Expand Down
33 changes: 33 additions & 0 deletions test/shared/pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,39 @@ defmodule Phoenix.PubSubTest do
assert subscribers(config, config.topic) |> length == 0
end

@tag pool_size: size
test "pool #{size}: subscribe and unsubscribe with metadata", config do
pid = spawn_pid()
pid2 = spawn_pid()
assert subscribers(config, config.topic) |> length == 0

# Subscribe with different metadata variants
assert rpc(pid, fn ->
PubSub.subscribe(config.pubsub, config.topic, metadata: :custom)
end)

assert rpc(pid, fn ->
PubSub.subscribe(config.pubsub, config.topic, metadata: :other)
end)

assert rpc(pid2, fn -> PubSub.subscribe(config.pubsub, config.topic) end)

# Verify all subscriptions exist
assert length(subscribers(config, config.topic)) == 3
assert {pid, :custom} in subscribers(config, config.topic)
assert {pid, :other} in subscribers(config, config.topic)
assert {pid2, nil} in subscribers(config, config.topic)

# Unsubscribe only the :custom metadata subscription
assert rpc(pid, fn -> PubSub.unsubscribe_match(config.pubsub, config.topic, :custom) end)

# Verify only :custom was removed, others remain
assert length(subscribers(config, config.topic)) == 2
assert {pid, :other} in subscribers(config, config.topic)
assert {pid2, nil} in subscribers(config, config.topic)
refute {pid, :custom} in subscribers(config, config.topic)
end

@tag pool_size: size
test "pool #{size}: broadcast/3 and broadcast!/3 publishes message to each subscriber",
config do
Expand Down