diff --git a/src/mqtt_sessions_process.erl b/src/mqtt_sessions_process.erl index fd02a27..3cd0f77 100644 --- a/src/mqtt_sessions_process.erl +++ b/src/mqtt_sessions_process.erl @@ -74,6 +74,24 @@ -type packet_id() :: 0..65535. % ?MAX_PACKET_ID +-record(queued, { + msg_nr :: pos_integer(), + type :: atom(), + packet_id = undefined :: undefined | packet_id(), + queued :: non_neg_integer(), + expiry :: non_neg_integer(), + qos = 0 :: 0..2, + message :: mqtt_packet_map:mqtt_packet() +}). + +-record(wait_for, { + msg_nr :: pos_integer(), + type :: atom(), + message = undefined :: undefined | mqtt_packet_map:mqtt_packet(), + is_sent = true :: boolean(), + queued :: non_neg_integer() +}). + -record(state, { protocol_version :: mqtt_packet_map:mqtt_version(), pool :: atom(), @@ -85,10 +103,10 @@ connection_pid = undefined :: pid() | undefined, is_session_present = false :: boolean(), is_connected = false :: boolean(), - buffer :: #{}, + buffer = #{} :: #{ non_neg_integer() => #queued{} }, packet_id = 1 :: packet_id(), send_quota = ?RECEIVE_MAXIMUM :: non_neg_integer(), - awaiting_ack = #{} :: map(), % Initiated by server + awaiting_ack = #{} :: #{ non_neg_integer() => #wait_for{} }, % Initiated by server awaiting_rel = #{} :: map(), % Initiated by client will = undefined :: undefined | map(), will_pid = undefined :: undefined | pid(), @@ -108,24 +126,6 @@ publish_jobs = #{} :: map() }). --record(queued, { - msg_nr :: pos_integer(), - type :: atom(), - packet_id = undefined :: undefined | packet_id(), - queued :: non_neg_integer(), - expiry :: non_neg_integer(), - qos = 0 :: 0..2, - message :: mqtt_packet_map:mqtt_packet() -}). - --record(wait_for, { - msg_nr :: pos_integer(), - type :: atom(), - message = undefined :: undefined | mqtt_packet_map:mqtt_packet(), - is_sent = true :: boolean(), - queued :: non_neg_integer() -}). - -include_lib("kernel/include/logger.hrl"). -include_lib("mqtt_packet_map/include/mqtt_packet_map.hrl"). @@ -1150,7 +1150,7 @@ queue(#{ type := Type } = Msg, MsgNr, #state{ buffer = Buffer } = State) -> maybe_purge(#state{ buffer = Buffer, awaiting_ack = WaitAcks } = State) -> State#state{ - buffer = maybe_purge_buffer(Buffer), + buffer = maybe_purge_buffer(maps:size(Buffer), Buffer), awaiting_ack = maybe_purge_ack(WaitAcks) }. @@ -1167,11 +1167,10 @@ maybe_purge_ack(WaitAcks) -> end, WaitAcks). -maybe_purge_buffer(Buffer) when is_map(Buffer) -> - case maps:size(Buffer) > ?MAX_BUFFERED of - true -> purge_buffer(Buffer); - false -> Buffer - end. +maybe_purge_buffer(Size, Buffer) when Size > ?MAX_BUFFERED-> + purge_buffer(Buffer); +maybe_purge_buffer(_Size, Buffer) -> + Buffer. purge_buffer(Buffer) -> Now = mqtt_sessions_timestamp:timestamp(), @@ -1200,7 +1199,7 @@ purge_buffer(Buffer) -> case maps:size(Buffer1) > ?MAX_BUFFERED of true -> % Drop all QoS 0 messages - maps:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Buffer1); + maps:filter(fun(_, #queued{ qos = QoS }) -> QoS > 0 end, Buffer1); false -> Buffer1 end.