Skip to content

Commit f371e24

Browse files
committed
FEAT: Adding new features
1 parent 31840dd commit f371e24

File tree

5 files changed

+54
-3
lines changed

5 files changed

+54
-3
lines changed
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
namespace AzureServiceBusFlow.Abstractions;
1+
using AzureServiceBusFlow.Models;
2+
3+
namespace AzureServiceBusFlow.Abstractions;
24

35
public interface ICommandProducer<in TCommand> where TCommand : class, IServiceBusMessage
46
{
57
Task ProduceCommandAsync(TCommand command, CancellationToken cancellationToken);
8+
9+
Task ProduceCommandAsync(TCommand command, MessageOptions messageOptions, CancellationToken cancellationToken);
610
}
Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1-
namespace AzureServiceBusFlow.Abstractions
1+
using AzureServiceBusFlow.Models;
2+
3+
namespace AzureServiceBusFlow.Abstractions
24
{
35
public interface IServiceBusProducer<in TMessage> where TMessage : class, IServiceBusMessage
46
{
57
Task ProduceAsync(TMessage message, CancellationToken cancellationToken);
8+
9+
Task ProduceAsync(
10+
TMessage message,
11+
MessageOptions producerOptions,
12+
CancellationToken cancellationToken);
613
}
714
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
namespace AzureServiceBusFlow.Models
2+
{
3+
public record MessageOptions(TimeSpan? Delay, IDictionary<string, object>? ApplicationProperties);
4+
}

src/AzureServiceBusFlow/Producers/CommandProducer.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using AzureServiceBusFlow.Abstractions;
2+
using AzureServiceBusFlow.Models;
23

34
namespace AzureServiceBusFlow.Producers
45
{
@@ -11,5 +12,10 @@ public Task ProduceCommandAsync(TCommand command, CancellationToken cancellation
1112
{
1213
return _producer.ProduceAsync(command, cancellationToken);
1314
}
15+
16+
public Task ProduceCommandAsync(TCommand command, MessageOptions messageOptions, CancellationToken cancellationToken)
17+
{
18+
return _producer.ProduceAsync(command, messageOptions, cancellationToken);
19+
}
1420
}
1521
}

src/AzureServiceBusFlow/Producers/ServiceBusProducer.cs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using Azure.Messaging.ServiceBus;
22
using AzureServiceBusFlow.Abstractions;
33
using AzureServiceBusFlow.Models;
4-
54
using Microsoft.Extensions.Logging;
65
using Newtonsoft.Json;
76

@@ -11,6 +10,7 @@ public class ServiceBusProducer<TMessage> : IServiceBusProducer<TMessage> where
1110
{
1211
private readonly ServiceBusSender _sender;
1312
private readonly ILogger _logger;
13+
1414
public ServiceBusProducer(AzureServiceBusConfiguration azureServiceBusConfiguration, string queueOrTopicName, ILogger logger)
1515
{
1616
var client = new ServiceBusClient(azureServiceBusConfiguration.ConnectionString);
@@ -36,5 +36,35 @@ public async Task ProduceAsync(TMessage message, CancellationToken cancellationT
3636

3737
_logger.LogInformation("Message {MessageType} published with successfully!", message.GetType().Name);
3838
}
39+
40+
public Task ProduceAsync(TMessage message, MessageOptions producerOptions, CancellationToken cancellationToken)
41+
{
42+
var json = JsonConvert.SerializeObject(message);
43+
var serviceBusMessage = new ServiceBusMessage(json)
44+
{
45+
Subject = (message as IServiceBusMessage)?.RoutingKey ?? message.GetType().Name,
46+
ApplicationProperties =
47+
{
48+
{ "MessageType", message.GetType().FullName },
49+
{ "CreatedAt", (message as IServiceBusMessage)?.CreatedDate.ToString("O") ?? DateTime.UtcNow.ToString("O") }
50+
}
51+
};
52+
53+
if (producerOptions?.ApplicationProperties is not null)
54+
{
55+
producerOptions?.ApplicationProperties?
56+
.Where(kvp => !serviceBusMessage.ApplicationProperties.ContainsKey(kvp.Key))
57+
.ToList()
58+
.ForEach(kvp => serviceBusMessage.ApplicationProperties.Add(kvp.Key, kvp.Value));
59+
}
60+
61+
if (producerOptions?.Delay is not null)
62+
{
63+
serviceBusMessage.ScheduledEnqueueTime = DateTimeOffset.UtcNow.Add(producerOptions.Delay.Value);
64+
}
65+
66+
return _sender.SendMessageAsync(serviceBusMessage, cancellationToken);
67+
}
68+
3969
}
4070
}

0 commit comments

Comments
 (0)