@@ -48,17 +48,28 @@ module MqClient =
4848
4949 type LogError = exn * string * obj -> unit
5050
51- type PublishConfig =
52- { Timeout: System .TimeSpan
53- Endpoint: string
54- ContentType: string
55- CorrelationId: string
51+ type Content =
52+ | Json of string
53+ | Binary of byte array
54+
55+ type PublishMessage =
56+ { CorrelationId: string
5657 Headers: Map < string , string >
57- Body : byte [] }
58+ Content : Content }
5859
5960 type ChannelConfig =
6061 { withConfirmSelect: bool }
6162
63+ let private contentTypeStringFromContent : Content -> string =
64+ function
65+ | Json _ -> " application/json"
66+ | Binary _ -> " application/octet-stream"
67+
68+ let private bodyFromContent : Content -> byte [] =
69+ function
70+ | Json jsonContent -> System.Text.Encoding.UTF8.GetBytes jsonContent
71+ | Binary binaryContent -> binaryContent
72+
6273 let private extractReplyTo : BasicDeliverEventArgs -> Result < string , string > =
6374 fun event ->
6475 match event.BasicProperties.ReplyTo with
@@ -82,15 +93,6 @@ module MqClient =
8293
8394 let routingKeyFromMessage : ReceivedMessage -> string = fun ( Message ( event , _ )) -> event.RoutingKey
8495
85- let publishConfigJson : string -> string option -> Map < string , string > -> string -> PublishConfig =
86- fun endpoint correlationId headers message ->
87- { Timeout = System.TimeSpan.FromSeconds 30.0
88- Endpoint = endpoint
89- ContentType = " application/json"
90- Headers = headers
91- CorrelationId = Option.defaultValue " " correlationId
92- Body = message |> System.Text.Encoding.UTF8.GetBytes }
93-
9496 let private asTask : ModelData -> 'event -> ( Message < 'event > -> Async < unit >) -> System.Threading.Tasks.Task =
9597 fun model event callback ->
9698 async { do ! callback ( Message( event, model)) } |> Async.StartAsTask :> System.Threading.Tasks.Task
@@ -339,18 +341,18 @@ module MqClient =
339341 |> Result.map initReplyQueue)
340342
341343 /// Will publish with confirm.
342- let publishToQueue : Model -> PublishConfig -> Async < PublishResult > =
343- fun ( Model model ) config ->
344+ let publishToQueue : Model -> System.TimeSpan -> string -> PublishMessage -> Async < PublishResult > =
345+ fun ( Model model ) timeout routingKey message ->
344346 async {
345347 let tcs = System.Threading.Tasks.TaskCompletionSource< PublishResult>()
346- use ct = new System.Threading.CancellationTokenSource( config.Timeout )
348+ use ct = new System.Threading.CancellationTokenSource( timeout )
347349 use _ctr =
348350 ct.Token.Register
349351 ( callback =
350352 ( fun () ->
351353 tcs.SetResult
352- (( sprintf " Publish to queue '%s ' timedout after %s s" config.Endpoint
353- ( config.Timeout .TotalSeconds.ToString())) |> PublishResult.Timeout) |> ignore),
354+ (( sprintf " Publish to queue '%s ' timedout after %s s" routingKey
355+ ( timeout .TotalSeconds.ToString())) |> PublishResult.Timeout) |> ignore),
354356 useSynchronizationContext = false )
355357
356358 let messageId = System.Guid.NewGuid() .ToString()
@@ -370,15 +372,15 @@ module MqClient =
370372 model.channelConsumer.Model.BasicNacks.AddHandler basicNackEventHandler
371373
372374 model.channelConsumer.Model.BasicPublish
373- ( exchange = " " , routingKey = config.Endpoint , mandatory = true ,
375+ ( exchange = " " , routingKey = routingKey , mandatory = true ,
374376 basicProperties =
375377 model.channelConsumer.Model.CreateBasicProperties
376- ( ContentType = config.ContentType , Persistent = true , MessageId = messageId ,
377- CorrelationId = config .CorrelationId,
378+ ( ContentType = contentTypeStringFromContent message.Content , Persistent = true ,
379+ MessageId = messageId , CorrelationId = message .CorrelationId,
378380 Headers =
379- ( config .Headers
381+ ( message .Headers
380382 |> Map.map ( fun _ v -> v :> obj )
381- |> ( Map.toSeq >> dict))), body = config.Body )
383+ |> ( Map.toSeq >> dict))), body = bodyFromContent message.Content )
382384
383385 ( basicAckEventHandler, basicNackEventHandler))
384386
@@ -394,9 +396,6 @@ module MqClient =
394396 | Choice2Of2 reason -> PublishResult.Unknown( sprintf " Task cancelled: %A " reason)
395397 }
396398
397- type Content =
398- | Json of string
399- | Binary of byte array
400399
401400 let replyToMessage : Model -> ReceivedMessage -> ( string * string ) list -> Content -> Async < PublishResult > =
402401 fun ( Model model ) receivedMessage headers content ->
@@ -430,36 +429,17 @@ module MqClient =
430429 PublishResult.Acked
431430 | Error errorMessage -> PublishResult.Unknown errorMessage)
432431
433- let respond : Model -> ReceivedMessage -> string -> AsyncResult < PublishResult , string > =
434- fun ( Model model ) receivedMessage response ->
435- receivedMessage
436- |> extractReplyProperties
437- |> AsyncResult.fromResult
438- |> AsyncResult.map ( fun replyProperties ->
439- let config =
440- publishConfigJson replyProperties.ReplyTo ( Some replyProperties.CorrelationId)
441- ( Map.ofList [ ( " sequence_end" , " true" ) ]) response // sequence_end is required by Rabbot clients (https://github.com/arobson/rabbot/issues/76)
442-
443- let messageId = System.Guid.NewGuid() .ToString()
444-
445- model.channelConsumer.Model.BasicPublish
446- ( exchange = " " , routingKey = config.Endpoint, mandatory = false , // mandatory must be false when publishing to direct-reply-to queue https://www.rabbitmq.com/direct-reply-to.html#limitations
447- basicProperties =
448- model.channelConsumer.Model.CreateBasicProperties
449- ( ContentType = config.ContentType, Persistent = true , MessageId = messageId,
450- CorrelationId = config.CorrelationId,
451- Headers =
452- ( config.Headers
453- |> Map.map ( fun _ v -> v :> obj)
454- |> ( Map.toSeq >> dict))), body = config.Body)
455-
456- PublishResult.Acked)
457-
458- let request : Model -> PublishConfig -> AsyncResult < ReceivedMessage , string > =
459- fun ( Model model ) config ->
432+ /// <summary>Make an RPC-call to a RabbitMq queue.</summary>
433+ /// <param name="Model">MqClient model.</param>
434+ /// <param name="timeout">Seconds before timing out request.</param>
435+ /// <param name="routingKey">Routing key to publish message to.</param>
436+ /// <param name="message">Message to be published.</param>
437+ /// <returns>Response from called RPC endpoint or error.</returns>
438+ let request : Model -> System.TimeSpan -> string -> PublishMessage -> AsyncResult < ReceivedMessage , string > =
439+ fun ( Model model ) timeout routingKey message ->
460440 async {
461441 let tcs = System.Threading.Tasks.TaskCompletionSource< Result< ReceivedMessage, string>>()
462- use ct = new System.Threading.CancellationTokenSource( config.Timeout )
442+ use ct = new System.Threading.CancellationTokenSource( timeout )
463443
464444 let messageId = System.Guid.NewGuid() .ToString()
465445
@@ -471,22 +451,22 @@ module MqClient =
471451
472452 tcs.TrySetResult
473453 ( Error
474- ( sprintf " Publish to queue '%s ' timedout after %s s" config.Endpoint
475- ( config.Timeout .TotalSeconds.ToString()))) |> ignore),
454+ ( sprintf " Publish to queue '%s ' timedout after %s s" routingKey
455+ ( timeout .TotalSeconds.ToString()))) |> ignore),
476456 useSynchronizationContext = false )
477457
478458 try
479459 if model.pendingRequests.TryAdd( messageId, tcs) then
480460 model.rpcConsumer.Model.BasicPublish
481- ( exchange = " " , routingKey = config.Endpoint , mandatory = true ,
461+ ( exchange = " " , routingKey = routingKey , mandatory = true ,
482462 basicProperties =
483463 model.rpcConsumer.Model.CreateBasicProperties
484- ( ContentType = config.ContentType , Persistent = false , MessageId = messageId ,
485- ReplyTo = " amq.rabbitmq.reply-to" ,
464+ ( ContentType = contentTypeStringFromContent message.Content , Persistent = false ,
465+ MessageId = messageId , ReplyTo = " amq.rabbitmq.reply-to" ,
486466 Headers =
487- ( config .Headers
467+ ( message .Headers
488468 |> Map.map ( fun _ v -> v :> obj )
489- |> ( Map.toSeq >> dict))), body = config.Body )
469+ |> ( Map.toSeq >> dict))), body = bodyFromContent message.Content )
490470
491471 let! result = tcs.Task |> ( Async.AwaitTask >> Async.Catch)
492472
0 commit comments