Skip to content

Commit 97027d7

Browse files
authored
Add respond and routingKeyFromMessaage. (#1)
1 parent 82cd3d7 commit 97027d7

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed

src/Insurello.RabbitMqClient/MqClient.fs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ module MqClient =
8080
{| ReplyTo = replyTo
8181
CorrelationId = messageId |}))
8282

83+
let routingKeyFromMessage: ReceivedMessage -> string = fun (Message(event, _)) -> event.RoutingKey
84+
8385
let publishConfigJson: string -> string option -> Map<string, string> -> string -> PublishConfig =
8486
fun endpoint correlationId headers message ->
8587
{ Timeout = System.TimeSpan.FromSeconds 30.0
@@ -428,6 +430,31 @@ module MqClient =
428430
PublishResult.Acked
429431
| Error errorMessage -> PublishResult.Unknown errorMessage)
430432

433+
let respond: Model -> ReceivedMessage -> string -> AsyncResult<PublishResult, string> =
434+
fun (Model model) receivedMessage response ->
435+
receivedMessage
436+
|> extractReplyProperties
437+
|> AsyncResult.fromResult
438+
|> AsyncResult.map (fun replyProperties ->
439+
let config =
440+
publishConfigJson replyProperties.ReplyTo (Some replyProperties.CorrelationId)
441+
(Map.ofList [ ("sequence_end", "true") ]) response // sequence_end is required by Rabbot clients (https://github.com/arobson/rabbot/issues/76)
442+
443+
let messageId = System.Guid.NewGuid().ToString()
444+
445+
model.channelConsumer.Model.BasicPublish
446+
(exchange = "", routingKey = config.Endpoint, mandatory = false, // mandatory must be false when publishing to direct-reply-to queue https://www.rabbitmq.com/direct-reply-to.html#limitations
447+
basicProperties =
448+
model.channelConsumer.Model.CreateBasicProperties
449+
(ContentType = config.ContentType, Persistent = true, MessageId = messageId,
450+
CorrelationId = config.CorrelationId,
451+
Headers =
452+
(config.Headers
453+
|> Map.map (fun _ v -> v :> obj)
454+
|> (Map.toSeq >> dict))), body = config.Body)
455+
456+
PublishResult.Acked)
457+
431458
let request: Model -> PublishConfig -> AsyncResult<ReceivedMessage, string> =
432459
fun (Model model) config ->
433460
async {

0 commit comments

Comments
 (0)