Skip to content

Commit a31652a

Browse files
MousakaPhilip Pålssonemilklasson
authored
Api improvements (#16)
* Add Async API * Use Map<String, String> everywhere for headers instead of List<(string * string)> Co-authored-by: Philip Pålsson <[email protected]> Co-authored-by: Emil Klasson <[email protected]>
1 parent 2969215 commit a31652a

File tree

1 file changed

+88
-51
lines changed

1 file changed

+88
-51
lines changed

src/Insurello.RabbitMqClient/MqClient.fs

Lines changed: 88 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ module MqClient =
2323

2424
type Model = private | Model of ModelData
2525

26+
/// <summary>
27+
/// The maximum number of MQ messages to be fetched from queues and get processed at a time by the RabbitMQ client.
28+
/// We recommend setting it to DefaultToTen if you don't know what you are doing.
29+
/// </summary>
30+
/// <returns>PrefetchConfig</returns>
31+
[<RequireQualifiedAccess>]
32+
type PrefetchConfig =
33+
| DefaultToTen
34+
| Count of int
35+
2636
type Callbacks =
2737
{ OnReceived: ReceivedMessage -> Async<unit>
2838
OnRegistered: ConsumerEventArgs Message -> Async<unit>
@@ -66,7 +76,6 @@ module MqClient =
6676
{ withConfirmSelect: bool
6777
prefetchCount: uint16 }
6878

69-
type PrefetchCount = PrefetchCount of uint16
7079

7180
let private contentTypeStringFromContent: Content -> string =
7281
function
@@ -326,6 +335,28 @@ module MqClient =
326335
fun (Message (event, model)) ->
327336
model.channelConsumer.Model.BasicNack(deliveryTag = event.DeliveryTag, multiple = false, requeue = false)
328337

338+
/// <summary>As <see cref="MqClient.ackMessage">ackMessage</see> but wrapped with Async</summary>
339+
let ackMessageAsync: ReceivedMessage -> Async<unit> = ackMessage >> Async.singleton
340+
341+
/// <summary>As <see cref="MqClient.nackMessage">nackMessage</see> but wrapped with Async</summary>
342+
let nackMessageAsync: ReceivedMessage -> Async<unit> = nackMessage >> Async.singleton
343+
344+
/// <summary>As <see cref="MqClient.nackMessageWithoutRequeue">nackMessageWithoutRequeue</see> but wrapped with Async</summary>
345+
let nackMessageWithoutRequeueAsync: ReceivedMessage -> Async<unit> =
346+
nackMessageWithoutRequeue >> Async.singleton
347+
348+
349+
let nackMessageWithDelay: System.TimeSpan -> ReceivedMessage -> Async<unit> =
350+
fun delay msg ->
351+
let clamp minValue maxValue value = value |> max minValue |> min maxValue
352+
353+
delay.TotalMilliseconds
354+
|> round
355+
|> clamp 0.0 (float System.Int32.MaxValue)
356+
|> int
357+
|> Async.Sleep
358+
|> Async.bind (fun () -> nackMessageAsync msg)
359+
329360
let messageBody: ReceivedMessage -> byte [] = fun (Message (event, _)) -> event.Body
330361

331362
let messageBodyAsString: ReceivedMessage -> RawBody =
@@ -334,55 +365,62 @@ module MqClient =
334365
let messageId: ReceivedMessage -> string =
335366
fun (Message (event, _)) -> event.BasicProperties.MessageId
336367

337-
let init: LogError -> string -> System.Uri -> Option<PrefetchCount> -> (Model -> Topology) -> Result<Model, string> =
338-
fun logError nameOfClient uri prefetchCountOption getTopology ->
339-
let prefetchCount =
340-
prefetchCountOption
341-
|> Option.map (fun (PrefetchCount prefetchCount) -> prefetchCount)
342-
|> Option.defaultValue (uint16 10)
343-
344-
connect nameOfClient uri
345-
|> Result.bind (fun connection ->
346-
let exCallback =
347-
(fun ex context connection ->
348-
logError (ex, "Unhandled exception on channel in context {$c}", context)
349-
closeConnection 3000 connection)
350-
351-
createChannel
352-
{ withConfirmSelect = true
353-
prefetchCount = prefetchCount } exCallback connection
354-
|> Result.bind (fun channel ->
368+
let private uint16FromPrefetchConfig: PrefetchConfig -> Result<uint16, string> =
369+
fun prefetchCount ->
370+
match prefetchCount with
371+
| PrefetchConfig.DefaultToTen -> Ok(uint16 10)
372+
| PrefetchConfig.Count count ->
373+
if count >= 0
374+
then Ok(uint16 count)
375+
else Error "PrefetchCount value must be a non-negative number"
376+
377+
let init: LogError -> string -> System.Uri -> PrefetchConfig -> (Model -> Topology) -> Result<Model, string> =
378+
fun logError nameOfClient uri prefetchConfig getTopology ->
379+
380+
prefetchConfig
381+
|> uint16FromPrefetchConfig
382+
|> Result.bind (fun prefetchCount ->
383+
connect nameOfClient uri
384+
|> Result.bind (fun connection ->
385+
let exCallback =
386+
(fun ex context connection ->
387+
logError (ex, "Unhandled exception on channel in context {$c}", context)
388+
closeConnection 3000 connection)
389+
355390
createChannel
356-
{ withConfirmSelect = false
391+
{ withConfirmSelect = true
357392
prefetchCount = prefetchCount } exCallback connection
358-
|> Result.map (fun rpcChannel ->
359-
(connection,
360-
Model
361-
{ channelConsumer = AsyncEventingBasicConsumer channel
362-
363-
rpcConsumer = AsyncEventingBasicConsumer rpcChannel
364-
365-
pendingRequests =
366-
System.Collections.Concurrent.ConcurrentDictionary<string, Result<ReceivedMessage, string> System.Threading.Tasks.TaskCompletionSource>
367-
() }))))
368-
|> Result.bind (fun (connection, model) ->
369-
let declareAQueue = declareQueue model
370-
let bindAQueue = bindQueueToExchange model
371-
372-
let consumeAQueue =
373-
consumeQueue model (System.Guid.NewGuid().ToString())
374-
375-
getTopology model
376-
|> List.fold (fun prevResult queueTopology ->
377-
Result.bind (fun _ ->
378-
declareAQueue queueTopology
379-
|> Result.bind bindAQueue
380-
|> Result.map consumeAQueue) prevResult) (Ok model)
381-
|> Result.mapError (fun error ->
382-
closeConnection 3000 connection
383-
error)
384-
|> Result.map initReplyQueue)
385-
393+
|> Result.bind (fun channel ->
394+
createChannel
395+
{ withConfirmSelect = false
396+
prefetchCount = prefetchCount } exCallback connection
397+
|> Result.map (fun rpcChannel ->
398+
(connection,
399+
Model
400+
{ channelConsumer = AsyncEventingBasicConsumer channel
401+
402+
rpcConsumer = AsyncEventingBasicConsumer rpcChannel
403+
404+
pendingRequests =
405+
System.Collections.Concurrent.ConcurrentDictionary<string, Result<ReceivedMessage, string> System.Threading.Tasks.TaskCompletionSource>
406+
() }))))
407+
|> Result.bind (fun (connection, model) ->
408+
let declareAQueue = declareQueue model
409+
let bindAQueue = bindQueueToExchange model
410+
411+
let consumeAQueue =
412+
consumeQueue model (System.Guid.NewGuid().ToString())
413+
414+
getTopology model
415+
|> List.fold (fun prevResult queueTopology ->
416+
Result.bind (fun _ ->
417+
declareAQueue queueTopology
418+
|> Result.bind bindAQueue
419+
|> Result.map consumeAQueue) prevResult) (Ok model)
420+
|> Result.mapError (fun error ->
421+
closeConnection 3000 connection
422+
error)
423+
|> Result.map initReplyQueue))
386424
/// Will publish with confirm.
387425
let publishToQueue: Model -> System.TimeSpan -> string -> PublishMessage -> Async<PublishResult> =
388426
fun (Model model) timeout routingKey message ->
@@ -460,15 +498,14 @@ module MqClient =
460498
}
461499

462500

463-
let replyToMessage: Model -> ReceivedMessage -> (string * string) list -> Content -> Async<PublishResult> =
501+
let replyToMessage: Model -> ReceivedMessage -> Map<string, string> -> Content -> Async<PublishResult> =
464502
fun (Model model) receivedMessage headers content ->
465503
receivedMessage
466504
|> extractReplyProperties
467505
|> AsyncResult.fromResult
468506
|> Async.map (function
469507
| Ok replyProperties ->
470-
let headers =
471-
Map([ ("sequence_end", "true") ] @ headers) // sequence_end is required by Rabbot clients (https://github.com/arobson/rabbot/issues/76)
508+
let headers = Map.add "sequence_end" "true" headers // sequence_end is required by Rabbot clients (https://github.com/arobson/rabbot/issues/76)
472509

473510
let (contentType, body) =
474511
match content with

0 commit comments

Comments
 (0)