Skip to content

Commit f882a28

Browse files
Merge pull request #13996 from rabbitmq/issue-13785
MQTT: disconnect consumer when queue is deleted
2 parents 2b2e4d4 + b48ab72 commit f882a28

File tree

5 files changed

+81
-18
lines changed

5 files changed

+81
-18
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1984,20 +1984,33 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
19841984
State ->
19851985
{ok, State}
19861986
catch throw:consuming_queue_down ->
1987-
{error, consuming_queue_down}
1987+
{error, consuming_queue_down}
19881988
end;
19891989
{eol, QStates1, QRef} ->
19901990
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QRef, U0),
19911991
QStates = rabbit_queue_type:remove(QRef, QStates1),
19921992
State = State0#state{queue_states = QStates,
19931993
unacked_client_pubs = U},
19941994
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
1995-
{ok, State}
1995+
try handle_queue_down(QName, State) of
1996+
State ->
1997+
{ok, State}
1998+
catch throw:consuming_queue_down ->
1999+
{error, consuming_queue_down}
2000+
end
19962001
end.
19972002

19982003
-spec handle_queue_event(
19992004
{queue_event, rabbit_amqqueue:name() | ?QUEUE_TYPE_QOS_0, term()}, state()) ->
20002005
{ok, state()} | {error, Reason :: any(), state()}.
2006+
handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, {queue_down, QName}},
2007+
State0) ->
2008+
try handle_queue_down(QName, State0) of
2009+
State ->
2010+
{ok, State}
2011+
catch throw:consuming_queue_down ->
2012+
{error, consuming_queue_down, State0}
2013+
end;
20012014
handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, Msg},
20022015
State0 = #state{qos0_messages_dropped = N}) ->
20032016
State = case drop_qos0_message(State0) of
@@ -2018,13 +2031,17 @@ handle_queue_event({queue_event, QName, Evt},
20182031
State = handle_queue_actions(Actions, State1),
20192032
{ok, State};
20202033
{eol, Actions} ->
2021-
State1 = handle_queue_actions(Actions, State0),
2022-
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
2023-
QStates = rabbit_queue_type:remove(QName, QStates0),
2024-
State = State1#state{queue_states = QStates,
2025-
unacked_client_pubs = U},
2026-
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
2027-
{ok, State};
2034+
try
2035+
State1 = handle_queue_actions(Actions ++ [{queue_down, QName}], State0),
2036+
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
2037+
QStates = rabbit_queue_type:remove(QName, QStates0),
2038+
State = State1#state{queue_states = QStates,
2039+
unacked_client_pubs = U},
2040+
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
2041+
{ok, State}
2042+
catch throw:consuming_queue_down ->
2043+
{error, consuming_queue_down, State0}
2044+
end;
20282045
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
20292046
{error, Error, State0}
20302047
end.

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
126126
log_delete(QName, amqqueue:get_exclusive_owner(Q)),
127127
case rabbit_amqqueue:internal_delete(Q, ActingUser) of
128128
ok ->
129+
Pid = amqqueue:get_pid(Q),
130+
delegate:invoke_no_result([Pid], {gen_server, cast, [{queue_event, ?MODULE, {queue_down, QName}}]}),
129131
{ok, 0};
130132
{error, timeout} = Err ->
131133
Err

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ handle_cast(QueueEvent = {queue_event, _, _},
131131
try rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
132132
{ok, PState} ->
133133
maybe_process_deferred_recv(control_throttle(pstate(State, PState)));
134+
{error, consuming_queue_down = Reason, PState} ->
135+
{stop, {shutdown, Reason}, pstate(State, PState)};
134136
{error, Reason0, PState} ->
135137
{stop, Reason0, pstate(State, PState)}
136138
catch throw:{send_failed, Reason1} ->

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
-import(rabbit_ct_broker_helpers,
2626
[rabbitmqctl_list/3,
27+
rabbitmqctl/3,
2728
rpc/4,
2829
rpc/5,
2930
rpc_all/4,
@@ -125,6 +126,9 @@ cluster_size_1_tests() ->
125126
,retained_message_conversion
126127
,bind_exchange_to_exchange
127128
,bind_exchange_to_exchange_single_message
129+
,notify_consumer_classic_queue_deleted
130+
,notify_consumer_quorum_queue_deleted
131+
,notify_consumer_qos0_queue_deleted
128132
].
129133

130134
cluster_size_3_tests() ->
@@ -167,8 +171,8 @@ init_per_suite(Config) ->
167171
end_per_suite(Config) ->
168172
rabbit_ct_helpers:run_teardown_steps(Config).
169173

170-
init_per_group(mqtt, Config) ->
171-
rabbit_ct_helpers:set_config(Config, {websocket, false});
174+
init_per_group(mqtt, Config0) ->
175+
rabbit_ct_helpers:set_config(Config0, {websocket, false});
172176
init_per_group(Group, Config)
173177
when Group =:= v3;
174178
Group =:= v4;
@@ -208,7 +212,8 @@ init_per_testcase(T, Config)
208212
init_per_testcase(T, Config)
209213
when T =:= clean_session_disconnect_client;
210214
T =:= clean_session_node_restart;
211-
T =:= clean_session_node_kill ->
215+
T =:= clean_session_node_kill;
216+
T =:= notify_consumer_qos0_queue_deleted ->
212217
ok = rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]),
213218
init_per_testcase0(T, Config);
214219
init_per_testcase(Testcase, Config) ->
@@ -225,7 +230,8 @@ end_per_testcase(T, Config)
225230
end_per_testcase(T, Config)
226231
when T =:= clean_session_disconnect_client;
227232
T =:= clean_session_node_restart;
228-
T =:= clean_session_node_kill ->
233+
T =:= clean_session_node_kill;
234+
T =:= notify_consumer_qos0_queue_deleted ->
229235
ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]),
230236
end_per_testcase0(T, Config);
231237
end_per_testcase(Testcase, Config) ->
@@ -324,9 +330,7 @@ will_without_disconnect(Config) ->
324330
%% Test that an MQTT connection decodes the AMQP 0.9.1 'P_basic' properties.
325331
%% see https://github.com/rabbitmq/rabbitmq-server/discussions/8252
326332
decode_basic_properties(Config) ->
327-
App = rabbitmq_mqtt,
328-
Par = durable_queue_type,
329-
ok = rpc(Config, application, set_env, [App, Par, quorum]),
333+
set_durable_queue_type(Config),
330334
ClientId = Topic = Payload = atom_to_binary(?FUNCTION_NAME),
331335
C1 = connect(ClientId, Config, non_clean_sess_opts()),
332336
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
@@ -340,7 +344,7 @@ decode_basic_properties(Config) ->
340344
ok = emqtt:disconnect(C1),
341345
C2 = connect(ClientId, Config, [{clean_start, true}]),
342346
ok = emqtt:disconnect(C2),
343-
ok = rpc(Config, application, unset_env, [App, Par]),
347+
unset_durable_queue_type(Config),
344348
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
345349

346350
quorum_queue_rejects(Config) ->
@@ -1955,6 +1959,35 @@ bind_exchange_to_exchange_single_message(Config) ->
19551959
ok = emqtt:disconnect(C),
19561960
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
19571961

1962+
notify_consumer_qos0_queue_deleted(Config) ->
1963+
Topic = atom_to_binary(?FUNCTION_NAME),
1964+
notify_consumer_queue_deleted(Config, Topic, <<"MQTT QoS 0">>, [{retry_interval, 1}], qos0).
1965+
1966+
notify_consumer_classic_queue_deleted(Config) ->
1967+
Topic = atom_to_binary(?FUNCTION_NAME),
1968+
notify_consumer_queue_deleted(Config, Topic, <<"classic">>, non_clean_sess_opts(), qos0).
1969+
1970+
notify_consumer_quorum_queue_deleted(Config) ->
1971+
set_durable_queue_type(Config),
1972+
Topic = atom_to_binary(?FUNCTION_NAME),
1973+
notify_consumer_queue_deleted(Config, Topic, <<"quorum">>, non_clean_sess_opts(), qos1),
1974+
unset_durable_queue_type(Config).
1975+
1976+
notify_consumer_queue_deleted(Config, Name = Topic, ExpectedType, ConnOpts, Qos) ->
1977+
C = connect(Name, Config, ConnOpts),
1978+
{ok, _, _} = emqtt:subscribe(C, Topic, Qos),
1979+
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m1">>, qos1),
1980+
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m2">>, qos1),
1981+
ok = expect_publishes(C, Topic, [<<"m1">>, <<"m2">>]),
1982+
1983+
[[QName, Type]] = rabbitmqctl_list(Config, 0, ["list_queues", "name", "type", "--no-table-headers"]),
1984+
?assertMatch(ExpectedType, Type),
1985+
1986+
process_flag(trap_exit, true),
1987+
{ok, _} = rabbitmqctl(Config, 0, ["delete_queue", QName]),
1988+
1989+
await_exit(C).
1990+
19581991
%% -------------------------------------------------------------------
19591992
%% Internal helpers
19601993
%% -------------------------------------------------------------------
@@ -1985,7 +2018,7 @@ await_confirms_unordered(From, Left) ->
19852018
end.
19862019

19872020
await_consumer_count(ConsumerCount, ClientId, QoS, Config) ->
1988-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
2021+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
19892022
QueueName = rabbit_mqtt_util:queue_name_bin(
19902023
rabbit_data_coercion:to_binary(ClientId), QoS),
19912024
eventually(
@@ -2030,3 +2063,9 @@ assert_v5_disconnect_reason_code(Config, ReasonCode) ->
20302063
after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server")
20312064
end
20322065
end.
2066+
2067+
set_durable_queue_type(Config) ->
2068+
ok = rpc(Config, application, set_env, [rabbitmq_mqtt, durable_queue_type, quorum]).
2069+
2070+
unset_durable_queue_type(Config) ->
2071+
ok = rpc(Config, application, unset_env, [rabbitmq_mqtt, durable_queue_type]).

deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,6 @@ duplicate_client_id(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
100100
publish_to_all_queue_types_qos0(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
101101
publish_to_all_queue_types_qos1(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
102102
maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
103+
notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
104+
notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
105+
notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).

0 commit comments

Comments
 (0)