Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to scheduler message #3487

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions docs/adr/0024-Scheduling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# 24. Scoping dependencies inline with lifetime scope

Date: 2025-01-20

## Status

Proposed

## Context

Adding the ability to schedule message (by providing `TimeSpan` or `DateTimeOffset`) give to user flexibility to `Send`, `Publis` and `Post`.



## Decision

Giving support to schedule message, it's necessary breaking on `IAmACommandProcessor` by adding these methods:

```c#
public interface IAmACommandProcessor
{
string SchedulerSend<TRequest>(TimeSpan delay, TRequest request) where TRequest : class, IRequest;
string SchedulerSend<TRequest>(DateTimeOffset delay, TRequest request) where TRequest : class, IRequest;
Task<string> SchedulerSendAsync<TRequest>(TimeSpan delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest;
Task<string> SchedulerSendAsync<TRequest>(DateTimeOffset delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest;

string SchedulerPublish<TRequest>(TimeSpan delay, TRequest request) where TRequest : class, IRequest;
string SchedulerPublish<TRequest>(DateTimeOffset delay, TRequest request) where TRequest : class, IRequest;
Task<string> SchedulerPublishAsync<TRequest>(TimeSpan delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest;
Task<string> SchedulerPublishsync<TRequest>(DateTimeOffset delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest;

string SchedulerPost<TRequest>(TimeSpan delay, TRequest request) where TRequest : class, IRequest;
string SchedulerPost<TRequest>(DateTimeOffset delay, TRequest request) where TRequest : class, IRequest;
Task<string> SchedulerPostAsync<TRequest>(TimeSpan delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest;
Task<string> SchedulerPostAsync<TRequest>(DateTimeOffset delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest;
}
```

Scheduling can be break into 2 part (Producer & Consumer):
- Producer -> Producing a message we are going to have a new interface:

```c#
public interface IAmAMessageScheduler
{
}


public interface IAmAMessageSchedulerAsync : IAmAMessageScheduler, IDisposable
{
Task<string> ScheduleAsync<TRequest>(DateTimeOffset at, SchedulerFireType fireType, TRequest request, CancellationToken cancellationToken = default) where TRequest : class, IRequest;
Task<string> ScheduleAsync<TRequest>(TimeSpan delay, SchedulerFireType fireType, TRequest request, CancellationToken cancellationToken = default) where TRequest : class, IRequest;
Task CancelSchedulerAsync(string id, CancellationToken cancellationToken = default);
}


public interface IAmAMessageSchedulerSync : IAmAMessageScheduler, IDisposable
{
string Schedule<TRequest>(DateTimeOffset at, SchedulerFireType fireType, TRequest request) where TRequest : class, IRequest;
string Schedule<TRequest>(TimeSpan delay, SchedulerFireType fireType, TRequest request) where TRequest : class, IRequest;
void CancelScheduler(string id);
}
```

- Consumer -> To avoid duplication code we are going to introduce a new message and have a handler for that:

```c#
public class SchedulerMessageFired : Event
{
.....
}


public class SchedulerMessageFiredHandlerAsync(IAmACommandProcessor processor) : RequestHandlerAsync<SchedulerMessageFired>
{
....
}
```

So on Scheduler implementation we need to send the SchedulerMessageFired

```c#
public class JobExecute(IAmACommandProcessor processor)
{
public async Task ExecuteAsync(Arg arg)
{
await processor.SendAsync(new SchedulerMessageFired{ ... });
}
}
```
20 changes: 18 additions & 2 deletions samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ static void Main(string[] args)
}
).Create();

serviceCollection.AddBrighter()
serviceCollection
.AddSingleton<IAmAMessageSchedulerFactory>(new InMemoryMessageSchedulerFactory())
.AddBrighter()
.UseExternalBus((configure) =>
{
configure.ProducerRegistry = producerRegistry;
Expand All @@ -84,9 +86,23 @@ static void Main(string[] args)

var serviceProvider = serviceCollection.BuildServiceProvider();

var commandProcessor = serviceProvider.GetService<IAmACommandProcessor>();
var commandProcessor = serviceProvider.GetRequiredService<IAmACommandProcessor>();

commandProcessor.Post(new GreetingEvent("Ian says: Hi there!"));

// TODO Remove this code:
while (true)
{
Console.WriteLine("Enter a name to greet (Q to quit):");
var name = Console.ReadLine();
if (name is "Q" or "q")
{
break;
}

commandProcessor.SchedulerPost(TimeSpan.FromSeconds(10), new GreetingEvent($"Ian says: Hi {name}"));
}

commandProcessor.Post(new FarewellEvent("Ian says: See you later!"));
}
}
Expand Down
18 changes: 17 additions & 1 deletion samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ static void Main(string[] args)
}
}).Create();

serviceCollection.AddBrighter()
serviceCollection
.AddSingleton<IAmAMessageSchedulerFactory>(new InMemoryMessageSchedulerFactory())
.AddBrighter()
.UseExternalBus((configure) =>
{
configure.ProducerRegistry = producerRegistry;
Expand All @@ -88,6 +90,20 @@ static void Main(string[] args)
var commandProcessor = serviceProvider.GetService<IAmACommandProcessor>();

commandProcessor.Post(new GreetingEvent("Ian says: Hi there!"));

// TODO Remove this code:
while (true)
{
Console.WriteLine("Enter a name to greet (Q to quit):");
var name = Console.ReadLine();
if (name is "Q" or "q")
{
break;
}

commandProcessor.SchedulerPost(TimeSpan.FromSeconds(60), new GreetingEvent($"Ian says: Hi {name}"));
}

commandProcessor.Post(new FarewellEvent("Ian says: See you later!"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ private static object BuildCommandProcessor(IServiceProvider provider)
var requestContextFactory = provider.GetService<IAmARequestContextFactory>();

var builder = contextBuilder.RequestContextFactory(requestContextFactory);

var schedulerMessageFactory = provider.GetService<IAmAMessageSchedulerFactory>();

builder.MessageSchedulerFactory(schedulerMessageFactory);

var commandProcessor = builder.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public SnsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
var messageString = message.Body.Value;
var publishRequest = new PublishRequest(_topicArn, messageString, message.Header.Subject);

if (string.IsNullOrEmpty(message.Header.CorrelationId))
{
message.Header.CorrelationId = Guid.NewGuid().ToString();
}

var messageAttributes = new Dictionary<string, MessageAttributeValue>
{
[HeaderNames.Id] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ THE SOFTWARE. */
using System.Collections.Generic;
using System.Transactions;
using Paramore.Brighter.Observability;
using Paramore.Brighter.Scheduler.Events;
using Paramore.Brighter.Scheduler.Handlers;
using Paramore.Brighter.ServiceActivator.Ports;
using Paramore.Brighter.ServiceActivator.Ports.Commands;
using Paramore.Brighter.ServiceActivator.Ports.Handlers;
Expand Down Expand Up @@ -139,6 +141,7 @@ public Dispatcher Build(string hostName)
var subscriberRegistry = new SubscriberRegistry();
subscriberRegistry.Register<ConfigurationCommand, ConfigurationCommandHandler>();
subscriberRegistry.Register<HeartbeatRequest, HeartbeatRequestCommandHandler>();
subscriberRegistry.RegisterAsync<SchedulerMessageFired, SchedulerMessageFiredHandlerAsync>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Large Method
Build increases from 73 to 74 lines of code, threshold = 70

Suppress


var incomingMessageMapperRegistry = new MessageMapperRegistry(
new ControlBusMessageMapperFactory(), null
Expand Down
112 changes: 108 additions & 4 deletions src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ THE SOFTWARE. */
using Paramore.Brighter.FeatureSwitch;
using Paramore.Brighter.Logging;
using Paramore.Brighter.Observability;
using Paramore.Brighter.Scheduler.Events;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Code Duplication
introduced similar code in: SchedulerAsync,SchedulerAsync,SchedulerPost,SchedulerPost

Suppress

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Missing Arguments Abstractions
The average number of function arguments increases from 4.12 to 4.17, threshold = 4.00

using Paramore.Brighter.Tasks;
using Polly;
using Polly.Registry;
using Exception = System.Exception;
Expand All @@ -61,6 +63,7 @@ public class CommandProcessor : IAmACommandProcessor
private readonly IAmAFeatureSwitchRegistry? _featureSwitchRegistry;
private readonly IEnumerable<Subscription>? _replySubscriptions;
private readonly IAmABrighterTracer? _tracer;
private readonly IAmAMessageSchedulerFactory? _messageSchedulerFactory;

//Uses -1 to indicate no outbox and will thus force a throw on a failed publish

Expand Down Expand Up @@ -117,6 +120,7 @@ public class CommandProcessor : IAmACommandProcessor
/// <param name="inboxConfiguration">Do we want to insert an inbox handler into pipelines without the attribute. Null (default = no), yes = how to configure</param>
/// <param name="tracer">What is the tracer we will use for telemetry</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
/// <param name="messageSchedulerFactory">TODO: ADD description </param>
public CommandProcessor(
IAmASubscriberRegistry subscriberRegistry,
IAmAHandlerFactory handlerFactory,
Expand All @@ -125,7 +129,8 @@ public CommandProcessor(
IAmAFeatureSwitchRegistry? featureSwitchRegistry = null,
InboxConfiguration? inboxConfiguration = null,
IAmABrighterTracer? tracer = null,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All,
IAmAMessageSchedulerFactory? messageSchedulerFactory = null)
Comment on lines +132 to +133

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Constructor Over-Injection
CommandProcessor increases from 8 to 9 arguments, threshold = 5

{
_subscriberRegistry = subscriberRegistry;

Expand All @@ -144,6 +149,7 @@ public CommandProcessor(
_inboxConfiguration = inboxConfiguration;
_tracer = tracer;
_instrumentationOptions = instrumentationOptions;
_messageSchedulerFactory = messageSchedulerFactory;
}

/// <summary>
Expand All @@ -162,6 +168,7 @@ public CommandProcessor(
/// <param name="responseChannelFactory">If we are expecting a response, then we need a channel to listen on</param>
/// <param name="tracer">What is the tracer we will use for telemetry</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
/// <param name="messageSchedulerFactory">TODO: ADD description </param>
public CommandProcessor(
IAmASubscriberRegistry subscriberRegistry,
IAmAHandlerFactory handlerFactory,
Expand All @@ -173,14 +180,15 @@ public CommandProcessor(
IEnumerable<Subscription>? replySubscriptions = null,
IAmAChannelFactory? responseChannelFactory = null,
IAmABrighterTracer? tracer = null,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
: this(subscriberRegistry, handlerFactory, requestContextFactory, policyRegistry, featureSwitchRegistry, inboxConfiguration)
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All,
IAmAMessageSchedulerFactory? messageSchedulerFactory = null)
: this(subscriberRegistry, handlerFactory, requestContextFactory, policyRegistry, featureSwitchRegistry, inboxConfiguration, messageSchedulerFactory: messageSchedulerFactory)
Comment on lines +183 to +185

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Constructor Over-Injection
CommandProcessor increases from 11 to 12 arguments, threshold = 5

{
_responseChannelFactory = responseChannelFactory;
_tracer = tracer;
_instrumentationOptions = instrumentationOptions;
_replySubscriptions = replySubscriptions;

InitExtServiceBus(bus);
}

Expand Down Expand Up @@ -217,6 +225,102 @@ public CommandProcessor(
InitExtServiceBus(mediator);
}

/// <inheritdoc />
public void SchedulerPost<TRequest>(TimeSpan delay, TRequest request, RequestContext? requestContext = null)
where TRequest : class, IRequest
{
if (_messageSchedulerFactory == null)
{
throw new InvalidOperationException("No message scheduler factory set.");
}

s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id);
var scheduler = _messageSchedulerFactory.Create(this);
if (scheduler is IAmAMessageSchedulerSync sync)
{
sync.Schedule(delay, SchedulerFireType.Post, request);
}
else if (scheduler is IAmAMessageSchedulerAsync asyncScheduler)
{
BrighterAsyncContext.Run(async () => await asyncScheduler.ScheduleAsync(delay, SchedulerFireType.Post, request));
}
}

/// <inheritdoc />
public void SchedulerPost<TRequest>(DateTimeOffset at,
TRequest request,
RequestContext? requestContext = null)
where TRequest : class, IRequest
{
if (_messageSchedulerFactory == null)
{
throw new InvalidOperationException("No message scheduler factory set.");
}

s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id);
var scheduler = _messageSchedulerFactory.Create(this);
if (scheduler is IAmAMessageSchedulerSync sync)
{
sync.Schedule(at, SchedulerFireType.Post, request);
}
else if (scheduler is IAmAMessageSchedulerAsync asyncScheduler)
{
BrighterAsyncContext.Run(async () => await asyncScheduler.ScheduleAsync(at, SchedulerFireType.Post, request));
}
}


/// <inheritdoc />
public async Task SchedulerAsync<TRequest>(TimeSpan delay,
TRequest request,
RequestContext? requestContext = null,
bool continueOnCapturedContext = true,
CancellationToken cancellationToken = default)
where TRequest : class, IRequest
{
if (_messageSchedulerFactory == null)
{
throw new InvalidOperationException("No message scheduler factory set.");
}

s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id);
var scheduler = _messageSchedulerFactory.Create(this);
if (scheduler is IAmAMessageSchedulerAsync asyncScheduler)
{
await asyncScheduler.ScheduleAsync(delay, SchedulerFireType.Post, request, cancellationToken);
}
else if (scheduler is IAmAMessageSchedulerSync sync)
{
sync.Schedule(delay, SchedulerFireType.Post, request);
}
}
Comment on lines +274 to +296

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ New issue: Excess Number of Function Arguments
SchedulerAsync has 5 arguments, threshold = 4


/// <inheritdoc />
public async Task SchedulerAsync<TRequest>(DateTimeOffset at,
TRequest request,
RequestContext? requestContext = null,
bool continueOnCapturedContext = true,
CancellationToken cancellationToken = default)
where TRequest : class, IRequest
{
if (_messageSchedulerFactory == null)
{
throw new InvalidOperationException("No message scheduler factory set.");
}

s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id);
var scheduler = _messageSchedulerFactory.Create(this);
if (scheduler is IAmAMessageSchedulerAsync asyncScheduler)
{
await asyncScheduler.ScheduleAsync(at, SchedulerFireType.Post, request, cancellationToken);
}
else if (scheduler is IAmAMessageSchedulerSync sync)
{
sync.Schedule(at, SchedulerFireType.Post, request);
}
}
Comment on lines +299 to +321

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ New issue: Excess Number of Function Arguments
SchedulerAsync has 5 arguments, threshold = 4



/// <summary>
/// Sends the specified command. We expect only one handler. The command is handled synchronously.
/// </summary>
Expand Down
Loading
Loading