@@ -57,8 +57,11 @@ module MqClient =
5757 Headers: Map < string , string >
5858 Content: Content }
5959
60- type ChannelConfig =
61- { withConfirmSelect: bool }
60+ type private ChannelConfig =
61+ { withConfirmSelect: bool
62+ prefetchCount: uint16 }
63+
64+ type PrefetchCount = PrefetchCount of uint16
6265
6366 let private contentTypeStringFromContent : Content -> string =
6467 function
@@ -171,6 +174,7 @@ module MqClient =
171174 fun config exCallback connection ->
172175 try
173176 let model = connection.CreateModel()
177+ model.BasicQos( uint32 0 , config.prefetchCount, false )
174178
175179 if config.withConfirmSelect then model.ConfirmSelect()
176180
@@ -306,18 +310,23 @@ module MqClient =
306310
307311 let messageId : ReceivedMessage -> string = fun ( Message ( event , _ )) -> event.BasicProperties.MessageId
308312
309- let init : LogError -> string -> System.Uri -> ( Model -> Topology ) -> Result < Model , string > =
310- fun logError nameOfClient uri getTopology ->
313+ let init : LogError -> string -> System.Uri -> Option < PrefetchCount > -> ( Model -> Topology ) -> Result < Model , string > =
314+ fun logError nameOfClient uri prefetchCountOption getTopology ->
315+ let prefetchCount =
316+ prefetchCountOption
317+ |> Option.map ( fun ( PrefetchCount prefetchCount ) -> prefetchCount)
318+ |> Option.defaultValue ( uint16 10 )
319+
311320 connect nameOfClient uri
312321 |> Result.bind ( fun connection ->
313322 let exCallback =
314323 ( fun ex context connection ->
315324 logError ( ex, " Unhandled exception on channel in context {$c}" , context)
316325 closeConnectionAsync ( System.TimeSpan.FromSeconds 3.0 ) connection)
317326
318- createChannel { withConfirmSelect = true } exCallback connection
327+ createChannel { withConfirmSelect = true ; prefetchCount = prefetchCount } exCallback connection
319328 |> Result.bind ( fun channel ->
320- createChannel { withConfirmSelect = false } exCallback connection
329+ createChannel { withConfirmSelect = false ; prefetchCount = prefetchCount } exCallback connection
321330 |> Result.map ( fun rpcChannel ->
322331 Model
323332 { channelConsumer = AsyncEventingBasicConsumer channel
0 commit comments