Skip to content

Commit

Permalink
Fix purge_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
mworrell committed May 31, 2024
1 parent 66cb9ce commit 833c4d3
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions src/mqtt_sessions_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@
-type packet_id() :: 0..65535. % ?MAX_PACKET_ID


-record(queued, {
msg_nr :: pos_integer(),
type :: atom(),
packet_id = undefined :: undefined | packet_id(),
queued :: non_neg_integer(),
expiry :: non_neg_integer(),
qos = 0 :: 0..2,
message :: mqtt_packet_map:mqtt_packet()
}).

-record(wait_for, {
msg_nr :: pos_integer(),
type :: atom(),
message = undefined :: undefined | mqtt_packet_map:mqtt_packet(),
is_sent = true :: boolean(),
queued :: non_neg_integer()
}).

-record(state, {
protocol_version :: mqtt_packet_map:mqtt_version(),
pool :: atom(),
Expand All @@ -85,10 +103,10 @@
connection_pid = undefined :: pid() | undefined,
is_session_present = false :: boolean(),
is_connected = false :: boolean(),
buffer :: #{},
buffer = #{} :: #{ non_neg_integer() => #queued{} },
packet_id = 1 :: packet_id(),
send_quota = ?RECEIVE_MAXIMUM :: non_neg_integer(),
awaiting_ack = #{} :: map(), % Initiated by server
awaiting_ack = #{} :: #{ non_neg_integer() => #wait_for{} }, % Initiated by server
awaiting_rel = #{} :: map(), % Initiated by client
will = undefined :: undefined | map(),
will_pid = undefined :: undefined | pid(),
Expand All @@ -108,24 +126,6 @@
publish_jobs = #{} :: map()
}).

-record(queued, {
msg_nr :: pos_integer(),
type :: atom(),
packet_id = undefined :: undefined | packet_id(),
queued :: non_neg_integer(),
expiry :: non_neg_integer(),
qos = 0 :: 0..2,
message :: mqtt_packet_map:mqtt_packet()
}).

-record(wait_for, {
msg_nr :: pos_integer(),
type :: atom(),
message = undefined :: undefined | mqtt_packet_map:mqtt_packet(),
is_sent = true :: boolean(),
queued :: non_neg_integer()
}).


-include_lib("kernel/include/logger.hrl").
-include_lib("mqtt_packet_map/include/mqtt_packet_map.hrl").
Expand Down Expand Up @@ -1150,7 +1150,7 @@ queue(#{ type := Type } = Msg, MsgNr, #state{ buffer = Buffer } = State) ->

maybe_purge(#state{ buffer = Buffer, awaiting_ack = WaitAcks } = State) ->
State#state{
buffer = maybe_purge_buffer(Buffer),
buffer = maybe_purge_buffer(maps:size(Buffer), Buffer),
awaiting_ack = maybe_purge_ack(WaitAcks)
}.

Expand All @@ -1167,11 +1167,10 @@ maybe_purge_ack(WaitAcks) ->
end,
WaitAcks).

maybe_purge_buffer(Buffer) when is_map(Buffer) ->
case maps:size(Buffer) > ?MAX_BUFFERED of
true -> purge_buffer(Buffer);
false -> Buffer
end.
maybe_purge_buffer(Size, Buffer) when Size > ?MAX_BUFFERED->
purge_buffer(Buffer);
maybe_purge_buffer(_Size, Buffer) ->
Buffer.

purge_buffer(Buffer) ->
Now = mqtt_sessions_timestamp:timestamp(),
Expand Down Expand Up @@ -1200,7 +1199,7 @@ purge_buffer(Buffer) ->
case maps:size(Buffer1) > ?MAX_BUFFERED of
true ->
% Drop all QoS 0 messages
maps:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Buffer1);
maps:filter(fun(_, #queued{ qos = QoS }) -> QoS > 0 end, Buffer1);
false ->
Buffer1
end.
Expand Down

0 comments on commit 833c4d3

Please sign in to comment.