From 44af4265c61a51ef69915f7feda88d815b565693 Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Fri, 17 Jan 2025 15:15:20 +0000 Subject: [PATCH 1/6] Init Message Scheduler V1 --- src/Paramore.Brighter/CommandProcessor.cs | 167 +++++++++++++++++- .../CommandProcessorBuilder.cs | 30 +++- src/Paramore.Brighter/IAmACommandProcessor.cs | 42 +++++ src/Paramore.Brighter/IAmAMessageScheduler.cs | 5 + .../IAmAMessageSchedulerAsync.cs | 12 ++ .../IAmAMessageSchedulerFactory.cs | 9 + .../IAmAMessageSchedulerSync.cs | 10 ++ .../IAmASchedulerMessageConsumer.cs | 6 + .../IAmASchedulerMessageConsumerAsync.cs | 8 + .../IAmASchedulerMessageConsumerSync.cs | 6 + .../IAmAnOutboxProducerMediator.cs | 2 +- .../InMemoryMessageScheduler.cs | 123 +++++++++++++ .../InMemoryMessageSchedulerFactory.cs | 31 ++++ .../OutboxProducerMediator.cs | 10 +- .../SchedulerMessageConsumer.cs | 35 ++++ 15 files changed, 485 insertions(+), 11 deletions(-) create mode 100644 src/Paramore.Brighter/IAmAMessageScheduler.cs create mode 100644 src/Paramore.Brighter/IAmAMessageSchedulerAsync.cs create mode 100644 src/Paramore.Brighter/IAmAMessageSchedulerFactory.cs create mode 100644 src/Paramore.Brighter/IAmAMessageSchedulerSync.cs create mode 100644 src/Paramore.Brighter/IAmASchedulerMessageConsumer.cs create mode 100644 src/Paramore.Brighter/IAmASchedulerMessageConsumerAsync.cs create mode 100644 src/Paramore.Brighter/IAmASchedulerMessageConsumerSync.cs create mode 100644 src/Paramore.Brighter/InMemoryMessageScheduler.cs create mode 100644 src/Paramore.Brighter/InMemoryMessageSchedulerFactory.cs create mode 100644 src/Paramore.Brighter/SchedulerMessageConsumer.cs diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index f044cd106c..98b71471a2 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -37,6 +37,7 @@ THE SOFTWARE. */ using Paramore.Brighter.FeatureSwitch; using Paramore.Brighter.Logging; using Paramore.Brighter.Observability; +using Paramore.Brighter.Tasks; using Polly; using Polly.Registry; using Exception = System.Exception; @@ -61,6 +62,7 @@ public class CommandProcessor : IAmACommandProcessor private readonly IAmAFeatureSwitchRegistry? _featureSwitchRegistry; private readonly IEnumerable? _replySubscriptions; private readonly IAmABrighterTracer? _tracer; + private readonly IAmAMessageSchedulerFactory? _messageSchedulerFactory; //Uses -1 to indicate no outbox and will thus force a throw on a failed publish @@ -117,6 +119,7 @@ public class CommandProcessor : IAmACommandProcessor /// Do we want to insert an inbox handler into pipelines without the attribute. Null (default = no), yes = how to configure /// What is the tracer we will use for telemetry /// When creating a span for operations how noisy should the attributes be + /// TODO: ADD description public CommandProcessor( IAmASubscriberRegistry subscriberRegistry, IAmAHandlerFactory handlerFactory, @@ -125,7 +128,8 @@ public CommandProcessor( IAmAFeatureSwitchRegistry? featureSwitchRegistry = null, InboxConfiguration? inboxConfiguration = null, IAmABrighterTracer? tracer = null, - InstrumentationOptions instrumentationOptions = InstrumentationOptions.All) + InstrumentationOptions instrumentationOptions = InstrumentationOptions.All, + IAmAMessageSchedulerFactory? messageSchedulerFactory = null) { _subscriberRegistry = subscriberRegistry; @@ -144,6 +148,7 @@ public CommandProcessor( _inboxConfiguration = inboxConfiguration; _tracer = tracer; _instrumentationOptions = instrumentationOptions; + _messageSchedulerFactory = messageSchedulerFactory; } /// @@ -162,6 +167,7 @@ public CommandProcessor( /// If we are expecting a response, then we need a channel to listen on /// What is the tracer we will use for telemetry /// When creating a span for operations how noisy should the attributes be + /// TODO: ADD description public CommandProcessor( IAmASubscriberRegistry subscriberRegistry, IAmAHandlerFactory handlerFactory, @@ -173,14 +179,15 @@ public CommandProcessor( IEnumerable? replySubscriptions = null, IAmAChannelFactory? responseChannelFactory = null, IAmABrighterTracer? tracer = null, - InstrumentationOptions instrumentationOptions = InstrumentationOptions.All) - : this(subscriberRegistry, handlerFactory, requestContextFactory, policyRegistry, featureSwitchRegistry, inboxConfiguration) + InstrumentationOptions instrumentationOptions = InstrumentationOptions.All, + IAmAMessageSchedulerFactory? messageSchedulerFactory = null) + : this(subscriberRegistry, handlerFactory, requestContextFactory, policyRegistry, featureSwitchRegistry, inboxConfiguration, messageSchedulerFactory: messageSchedulerFactory) { _responseChannelFactory = responseChannelFactory; _tracer = tracer; _instrumentationOptions = instrumentationOptions; _replySubscriptions = replySubscriptions; - + InitExtServiceBus(bus); } @@ -217,6 +224,158 @@ public CommandProcessor( InitExtServiceBus(mediator); } + /// + public void Scheduler(TimeSpan delay, TRequest request, RequestContext? requestContext = null) + where TRequest : class, IRequest + => Scheduler(delay, request, null, requestContext); + + public void Scheduler(TimeSpan delay, + TRequest request, + IAmABoxTransactionProvider? transactionProvider, + RequestContext? requestContext = null) + where TRequest : class, IRequest + { + if (_messageSchedulerFactory == null) + { + throw new InvalidOperationException("No message scheduler factory set."); + } + + s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); + var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Send, request, requestContext?.Span, options: _instrumentationOptions); + var context = InitRequestContext(span, requestContext); + + var message = s_mediator!.CreateMessageFromRequest(request, context); + var scheduler = _messageSchedulerFactory.Create(s_mediator, transactionProvider); + if (scheduler is IAmAMessageSchedulerSync sync) + { + sync.Schedule(delay, message, context); + } + else if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) + { + BrighterAsyncContext.Run(async () => await asyncScheduler.ScheduleAsync(delay, message, context)); + } + } + + /// + public void Scheduler(DateTimeOffset at, + TRequest request, + RequestContext? requestContext = null) + where TRequest : class, IRequest => Scheduler(at, request, null, requestContext); + + public void Scheduler(DateTimeOffset at, + TRequest request, + IAmABoxTransactionProvider? transactionProvider, + RequestContext? requestContext = null) + where TRequest : class, IRequest + { + if (_messageSchedulerFactory == null) + { + throw new InvalidOperationException("No message scheduler factory set."); + } + + s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); + var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Send, request, requestContext?.Span, options: _instrumentationOptions); + var context = InitRequestContext(span, requestContext); + + var message = s_mediator!.CreateMessageFromRequest(request, context); + var scheduler = _messageSchedulerFactory.Create(s_mediator, transactionProvider); + if (scheduler is IAmAMessageSchedulerSync sync) + { + sync.Schedule(at, message, context); + } + else if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) + { + BrighterAsyncContext.Run(async () => await asyncScheduler.ScheduleAsync(at, message, context)); + } + } + + + /// + public async Task SchedulerAsync(TimeSpan delay, + TRequest request, + RequestContext? requestContext = null, + bool continueOnCapturedContext = true, + CancellationToken cancellationToken = default) + where TRequest : class, IRequest => + await SchedulerAsync(delay, + request, + null, + requestContext, + continueOnCapturedContext, + cancellationToken); + + public async Task SchedulerAsync(TimeSpan delay, + TRequest request, + IAmABoxTransactionProvider? transactionProvider, + RequestContext? requestContext = null, + bool continueOnCapturedContext = true, + CancellationToken cancellationToken = default) + where TRequest : class, IRequest + { + if (_messageSchedulerFactory == null) + { + throw new InvalidOperationException("No message scheduler factory set."); + } + + s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); + var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Send, request, requestContext?.Span, options: _instrumentationOptions); + var context = InitRequestContext(span, requestContext); + + var message = await s_mediator!.CreateMessageFromRequestAsync(request, context, cancellationToken).ConfigureAwait(continueOnCapturedContext); + var scheduler = _messageSchedulerFactory.Create(s_mediator, transactionProvider); + if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) + { + await asyncScheduler.ScheduleAsync(delay, message, context, cancellationToken).ConfigureAwait(continueOnCapturedContext); + } + else if (scheduler is IAmAMessageSchedulerSync sync) + { + sync.Schedule(delay, message, context); + } + } + + /// + public async Task SchedulerAsync(DateTimeOffset at, + TRequest request, + RequestContext? requestContext = null, + bool continueOnCapturedContext = true, + CancellationToken cancellationToken = default) + where TRequest : class, IRequest => + await SchedulerAsync(at, + request, + null, + requestContext, + continueOnCapturedContext, + cancellationToken); + + public async Task SchedulerAsync(DateTimeOffset at, + TRequest request, + IAmABoxTransactionProvider? transactionProvider, + RequestContext? requestContext = null, + bool continueOnCapturedContext = true, + CancellationToken cancellationToken = default) + where TRequest : class, IRequest + { + if (_messageSchedulerFactory == null) + { + throw new InvalidOperationException("No message scheduler factory set."); + } + + s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); + var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Send, request, requestContext?.Span, options: _instrumentationOptions); + var context = InitRequestContext(span, requestContext); + + var message = await s_mediator!.CreateMessageFromRequestAsync(request, context, cancellationToken).ConfigureAwait(continueOnCapturedContext); + var scheduler = _messageSchedulerFactory.Create(s_mediator, transactionProvider); + if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) + { + await asyncScheduler.ScheduleAsync(at, message, context, cancellationToken).ConfigureAwait(continueOnCapturedContext); + } + else if (scheduler is IAmAMessageSchedulerSync sync) + { + sync.Schedule(at, message, context); + } + } + /// /// Sends the specified command. We expect only one handler. The command is handled synchronously. /// diff --git a/src/Paramore.Brighter/CommandProcessorBuilder.cs b/src/Paramore.Brighter/CommandProcessorBuilder.cs index 10217804e3..72d5e32257 100644 --- a/src/Paramore.Brighter/CommandProcessorBuilder.cs +++ b/src/Paramore.Brighter/CommandProcessorBuilder.cs @@ -78,7 +78,13 @@ namespace Paramore.Brighter /// /// /// - public class CommandProcessorBuilder : INeedAHandlers, INeedPolicy, INeedMessaging, INeedInstrumentation, INeedARequestContext, IAmACommandProcessorBuilder + public class CommandProcessorBuilder : INeedAHandlers, + INeedPolicy, + INeedMessaging, + INeedInstrumentation, + INeedARequestContext, + INeedAMessageSchedulerFactory, + IAmACommandProcessorBuilder { private IAmARequestContextFactory? _requestContextFactory; private IAmASubscriberRegistry? _registry; @@ -93,6 +99,7 @@ public class CommandProcessorBuilder : INeedAHandlers, INeedPolicy, INeedMessagi private InboxConfiguration? _inboxConfiguration; private InstrumentationOptions? _instrumetationOptions; private IAmABrighterTracer? _tracer; + private IAmAMessageSchedulerFactory? _messageSchedulerFactory; private CommandProcessorBuilder() { @@ -250,6 +257,12 @@ public IAmACommandProcessorBuilder RequestContextFactory(IAmARequestContextFacto _requestContextFactory = requestContextFactory; return this; } + + public IAmACommandProcessorBuilder MessageSchedulerFactory(IAmAMessageSchedulerFactory messageSchedulerFactory) + { + _messageSchedulerFactory = messageSchedulerFactory; + return this; + } /// /// Builds the from the configuration. @@ -290,7 +303,8 @@ public CommandProcessor Build() featureSwitchRegistry: _featureSwitchRegistry, inboxConfiguration: _inboxConfiguration, tracer: _tracer, - instrumentationOptions: _instrumetationOptions.Value + instrumentationOptions: _instrumetationOptions.Value, + messageSchedulerFactory: _messageSchedulerFactory ); if (_useRequestReplyQueues) @@ -305,12 +319,15 @@ public CommandProcessor Build() replySubscriptions: _replySubscriptions, responseChannelFactory: _responseChannelFactory, tracer: _tracer, - instrumentationOptions: _instrumetationOptions.Value + instrumentationOptions: _instrumetationOptions.Value, + messageSchedulerFactory: _messageSchedulerFactory ); throw new ConfigurationException( "The configuration options chosen cannot be used to construct a command processor"); } + + } #region Progressive interfaces @@ -420,6 +437,12 @@ public interface INeedARequestContext /// IAmACommandProcessorBuilder. IAmACommandProcessorBuilder RequestContextFactory(IAmARequestContextFactory requestContextFactory); } + + // TODO Add doc + public interface INeedAMessageSchedulerFactory + { + IAmACommandProcessorBuilder MessageSchedulerFactory(IAmAMessageSchedulerFactory messageSchedulerFactory); + } /// /// Interface IAmACommandProcessorBuilder @@ -432,5 +455,6 @@ public interface IAmACommandProcessorBuilder /// CommandProcessor. CommandProcessor Build(); } + #endregion } diff --git a/src/Paramore.Brighter/IAmACommandProcessor.cs b/src/Paramore.Brighter/IAmACommandProcessor.cs index e59337f811..07db59f666 100644 --- a/src/Paramore.Brighter/IAmACommandProcessor.cs +++ b/src/Paramore.Brighter/IAmACommandProcessor.cs @@ -39,6 +39,48 @@ namespace Paramore.Brighter /// public interface IAmACommandProcessor { + /// + /// Sends the specified command. + /// + /// + /// The amount of delay to be used before send the message. + /// The command. + /// The context of the request; if null we will start one via a + void Scheduler(TimeSpan delay, TRequest request, RequestContext? requestContext = null) where TRequest : class, IRequest; + + /// + /// Sends the specified command. + /// + /// + /// The that the message should be published. + /// The command. + /// The context of the request; if null we will start one via a + void Scheduler(DateTimeOffset at, TRequest request, RequestContext? requestContext = null) where TRequest : class, IRequest; + + /// + /// Awaitably sends the specified command. + /// + /// + /// The amount of delay to be used before send the message. + /// The command. + /// The context of the request; if null we will start one via a + /// Should we use the calling thread's synchronization context when continuing or a default thread synchronization context. Defaults to false + /// Allows the sender to cancel the request pipeline. Optional + /// awaitable . + Task SchedulerAsync(TimeSpan delay, TRequest request, RequestContext? requestContext = null, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + + /// + /// Awaitably sends the specified command. + /// + /// + /// The that the message should be published. + /// The command. + /// The context of the request; if null we will start one via a + /// Should we use the calling thread's synchronization context when continuing or a default thread synchronization context. Defaults to false + /// Allows the sender to cancel the request pipeline. Optional + /// awaitable . + Task SchedulerAsync(DateTimeOffset at, TRequest request, RequestContext? requestContext = null, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + /// /// Sends the specified command. /// diff --git a/src/Paramore.Brighter/IAmAMessageScheduler.cs b/src/Paramore.Brighter/IAmAMessageScheduler.cs new file mode 100644 index 0000000000..588c846d25 --- /dev/null +++ b/src/Paramore.Brighter/IAmAMessageScheduler.cs @@ -0,0 +1,5 @@ +namespace Paramore.Brighter; + +public interface IAmAMessageScheduler +{ +} diff --git a/src/Paramore.Brighter/IAmAMessageSchedulerAsync.cs b/src/Paramore.Brighter/IAmAMessageSchedulerAsync.cs new file mode 100644 index 0000000000..86ea356212 --- /dev/null +++ b/src/Paramore.Brighter/IAmAMessageSchedulerAsync.cs @@ -0,0 +1,12 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Paramore.Brighter; + +public interface IAmAMessageSchedulerAsync : IAmAMessageScheduler, IDisposable +{ + Task ScheduleAsync(DateTimeOffset at, Message message, RequestContext context, CancellationToken cancellationToken = default); + Task ScheduleAsync(TimeSpan delay, Message message, RequestContext context, CancellationToken cancellationToken = default); + Task CancelSchedulerAsync(string id, CancellationToken cancellationToken = default); +} diff --git a/src/Paramore.Brighter/IAmAMessageSchedulerFactory.cs b/src/Paramore.Brighter/IAmAMessageSchedulerFactory.cs new file mode 100644 index 0000000000..5327fe7c11 --- /dev/null +++ b/src/Paramore.Brighter/IAmAMessageSchedulerFactory.cs @@ -0,0 +1,9 @@ +namespace Paramore.Brighter; + +public interface IAmAMessageSchedulerFactory +{ + IAmAMessageScheduler Create(IAmAnOutboxProducerMediator mediator); + + IAmAMessageScheduler Create(IAmAnOutboxProducerMediator mediator, + IAmABoxTransactionProvider? transactionProvider); +} diff --git a/src/Paramore.Brighter/IAmAMessageSchedulerSync.cs b/src/Paramore.Brighter/IAmAMessageSchedulerSync.cs new file mode 100644 index 0000000000..50f06ccc61 --- /dev/null +++ b/src/Paramore.Brighter/IAmAMessageSchedulerSync.cs @@ -0,0 +1,10 @@ +using System; + +namespace Paramore.Brighter; + +public interface IAmAMessageSchedulerSync : IAmAMessageScheduler, IDisposable +{ + string Schedule(DateTimeOffset at, Message message, RequestContext context); + string Schedule(TimeSpan delay, Message message, RequestContext context); + void CancelScheduler(string id); +} diff --git a/src/Paramore.Brighter/IAmASchedulerMessageConsumer.cs b/src/Paramore.Brighter/IAmASchedulerMessageConsumer.cs new file mode 100644 index 0000000000..c917ae6c88 --- /dev/null +++ b/src/Paramore.Brighter/IAmASchedulerMessageConsumer.cs @@ -0,0 +1,6 @@ +namespace Paramore.Brighter; + +public interface IAmASchedulerMessageConsumer +{ + +} diff --git a/src/Paramore.Brighter/IAmASchedulerMessageConsumerAsync.cs b/src/Paramore.Brighter/IAmASchedulerMessageConsumerAsync.cs new file mode 100644 index 0000000000..77b87a8964 --- /dev/null +++ b/src/Paramore.Brighter/IAmASchedulerMessageConsumerAsync.cs @@ -0,0 +1,8 @@ +using System.Threading.Tasks; + +namespace Paramore.Brighter; + +public interface IAmASchedulerMessageConsumerAsync : IAmASchedulerMessageConsumer +{ + Task ConsumeAsync(Message message, RequestContext context); +} diff --git a/src/Paramore.Brighter/IAmASchedulerMessageConsumerSync.cs b/src/Paramore.Brighter/IAmASchedulerMessageConsumerSync.cs new file mode 100644 index 0000000000..ea58eed2c7 --- /dev/null +++ b/src/Paramore.Brighter/IAmASchedulerMessageConsumerSync.cs @@ -0,0 +1,6 @@ +namespace Paramore.Brighter; + +public interface IAmASchedulerMessageConsumerSync : IAmASchedulerMessageConsumer +{ + void Consume(Message message, RequestContext context); +} diff --git a/src/Paramore.Brighter/IAmAnOutboxProducerMediator.cs b/src/Paramore.Brighter/IAmAnOutboxProducerMediator.cs index 9d0e34cb3b..0fe7df1d37 100644 --- a/src/Paramore.Brighter/IAmAnOutboxProducerMediator.cs +++ b/src/Paramore.Brighter/IAmAnOutboxProducerMediator.cs @@ -15,7 +15,7 @@ public interface IAmAnOutboxProducerMediator : IDisposable /// Used with RPC to call a remote service via the external bus /// /// The message to send - /// The context of the request pipeline + /// The context of the request pipeline /// The type of the call /// The type of the response void CallViaExternalBus(Message outMessage, RequestContext? requestContext) diff --git a/src/Paramore.Brighter/InMemoryMessageScheduler.cs b/src/Paramore.Brighter/InMemoryMessageScheduler.cs new file mode 100644 index 0000000000..448203aa39 --- /dev/null +++ b/src/Paramore.Brighter/InMemoryMessageScheduler.cs @@ -0,0 +1,123 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using Paramore.Brighter.Tasks; + +namespace Paramore.Brighter; + +public class InMemoryMessageScheduler : IAmAMessageSchedulerSync +{ + private readonly SchedulerMessageCollection _messages = new(); + private readonly IAmASchedulerMessageConsumer _consumer; + + private readonly Timer _timer; + + public InMemoryMessageScheduler(IAmASchedulerMessageConsumer consumer, + TimeSpan initialDelay, + TimeSpan period) + { + _consumer = consumer; + _timer = new Timer(Consume, this, initialDelay, period); + } + + private static void Consume(object? state) + { + var scheduler = (InMemoryMessageScheduler)state!; + + var now = DateTimeOffset.UtcNow; + var schedulerMessage = scheduler._messages.Next(now); + while (schedulerMessage != null) + { + if (scheduler._consumer is IAmASchedulerMessageConsumerSync syncConsumer) + { + syncConsumer.Consume(schedulerMessage.Message, schedulerMessage.Context); + } + else if (scheduler._consumer is IAmASchedulerMessageConsumerAsync asyncConsumer) + { + var tmp = schedulerMessage; + BrighterAsyncContext.Run(async () => await asyncConsumer.ConsumeAsync(tmp.Message, tmp.Context)); + } + + // TODO Add log + schedulerMessage = scheduler._messages.Next(now); + } + } + + public string Schedule(DateTimeOffset at, Message message, RequestContext context) + { + var id = Guid.NewGuid().ToString(); + _messages.Add(new SchedulerMessage(id, message, context, at)); + return id; + } + + public string Schedule(TimeSpan delay, Message message, RequestContext context) + => Schedule(DateTimeOffset.UtcNow.Add(delay), message, context); + + public void CancelScheduler(string id) + => _messages.Delete(id); + + public void Dispose() => _timer.Dispose(); + + + private record SchedulerMessage(string Id, Message Message, RequestContext Context, DateTimeOffset At); + + private class SchedulerMessageCollection + { + // It's a sorted list + private readonly object _lock = new(); + private readonly LinkedList _messages = new(); + + public SchedulerMessage? Next(DateTimeOffset now) + { + lock (_lock) + { + var first = _messages.First?.Value; + if (first == null || first.At >= now) + { + return null; + } + + _messages.RemoveFirst(); + return first; + } + } + + public void Add(SchedulerMessage message) + { + lock (_lock) + { + var node = _messages.First; + while (node != null) + { + if (node.Value.At > message.At) + { + _messages.AddBefore(node, message); + return; + } + + node = node.Next; + } + + _messages.AddLast(message); + } + } + + public void Delete(string id) + { + lock (_lock) + { + var node = _messages.First; + while (node != null) + { + if (node.Value.Id == id) + { + _messages.Remove(node); + return; + } + + node = node.Next; + } + } + } + } +} diff --git a/src/Paramore.Brighter/InMemoryMessageSchedulerFactory.cs b/src/Paramore.Brighter/InMemoryMessageSchedulerFactory.cs new file mode 100644 index 0000000000..345df932dd --- /dev/null +++ b/src/Paramore.Brighter/InMemoryMessageSchedulerFactory.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Transactions; + +namespace Paramore.Brighter; + +public class InMemoryMessageSchedulerFactory(TimeSpan initialDelay, TimeSpan period) : IAmAMessageSchedulerFactory +{ + public InMemoryMessageSchedulerFactory() + : this(TimeSpan.Zero, TimeSpan.FromSeconds(1)) + { + } + + private static readonly Dictionary s_schedulers = new(); + + public IAmAMessageScheduler Create(IAmAnOutboxProducerMediator mediator) + => Create(mediator, null); + + public IAmAMessageScheduler Create(IAmAnOutboxProducerMediator mediator, + IAmABoxTransactionProvider? transactionProvider) + { + if (!s_schedulers.TryGetValue(typeof(TTransaction), out var scheduler)) + { + var consumer = new SchedulerMessageConsumer(mediator, transactionProvider); + scheduler = new InMemoryMessageScheduler(consumer, initialDelay, period); + s_schedulers[typeof(TTransaction)] = scheduler; + } + + return scheduler; + } +} diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index 523e283213..e5edb0ac35 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -58,6 +58,7 @@ public class OutboxProducerMediator : IAmAnOutboxProduce private readonly IAmAProducerRegistry _producerRegistry; private readonly InstrumentationOptions _instrumentationOptions; private readonly Dictionary> _outboxBatches = new(); + private readonly IAmAMessageSchedulerFactory? _messageSchedulerFactory; private static readonly SemaphoreSlim s_clearSemaphoreToken = new(1, 1); @@ -104,7 +105,7 @@ public OutboxProducerMediator( IAmAMessageMapperRegistry mapperRegistry, IAmAMessageTransformerFactory messageTransformerFactory, IAmAMessageTransformerFactoryAsync messageTransformerFactoryAsync, - IAmABrighterTracer tracer, + IAmABrighterTracer tracer, IAmAnOutbox? outbox = null, IAmARequestContextFactory? requestContextFactory = null, int outboxTimeout = 300, @@ -112,7 +113,8 @@ public OutboxProducerMediator( TimeSpan? maxOutStandingCheckInterval = null, Dictionary? outBoxBag = null, TimeProvider? timeProvider = null, - InstrumentationOptions instrumentationOptions = InstrumentationOptions.All) + InstrumentationOptions instrumentationOptions = InstrumentationOptions.All, + IAmAMessageSchedulerFactory? messageSchedulerFactory = null) { _producerRegistry = producerRegistry ?? throw new ConfigurationException("Missing Producer Registry for External Bus Services"); @@ -151,6 +153,7 @@ public OutboxProducerMediator( _outBoxBag = outBoxBag ?? new Dictionary(); _instrumentationOptions = instrumentationOptions; _tracer = tracer; + _messageSchedulerFactory = messageSchedulerFactory; ConfigureCallbacks(requestContextFactory.Create()); } @@ -748,7 +751,8 @@ private bool ConfigurePublisherCallbackMaybe(IAmAMessageProducer producer, Reque return false; } - private void Dispatch(IEnumerable posts, RequestContext requestContext, + private void Dispatch(IEnumerable posts, + RequestContext requestContext, Dictionary? args = null) { var parentSpan = requestContext.Span; diff --git a/src/Paramore.Brighter/SchedulerMessageConsumer.cs b/src/Paramore.Brighter/SchedulerMessageConsumer.cs new file mode 100644 index 0000000000..45cc2f96e6 --- /dev/null +++ b/src/Paramore.Brighter/SchedulerMessageConsumer.cs @@ -0,0 +1,35 @@ +using System; +using System.Threading.Tasks; + +namespace Paramore.Brighter; + +public class SchedulerMessageConsumer( + IAmAnOutboxProducerMediator mediator, + IAmABoxTransactionProvider? transactionProvider) : + IAmASchedulerMessageConsumerSync, + IAmASchedulerMessageConsumerAsync +{ + public async Task ConsumeAsync(Message message, RequestContext context) + { + if (!mediator.HasOutbox()) + { + throw new InvalidOperationException("No outbox defined."); + } + + var outbox = (IAmAnOutboxProducerMediator)mediator; + await outbox.AddToOutboxAsync(message, context, transactionProvider); + await outbox.ClearOutboxAsync([message.Id], context); + } + + public void Consume(Message message, RequestContext context) + { + if (!mediator.HasOutbox()) + { + throw new InvalidOperationException("No outbox defined."); + } + + var outbox = (IAmAnOutboxProducerMediator)mediator; + outbox.AddToOutbox(message, context, transactionProvider); + outbox.ClearOutbox([message.Id], context); + } +} From b8d365bced89a2ef1cee9581db8dfce0152b0fdf Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Fri, 17 Jan 2025 16:28:31 +0000 Subject: [PATCH 2/6] Init support to scheduler messages --- docker-compose-rmq.yaml | 11 +---------- .../AWSTaskQueue/GreetingsSender/Program.cs | 18 +++++++++++++++++- .../RMQTaskQueue/GreetingsSender/Program.cs | 18 +++++++++++++++++- .../ServiceCollectionExtensions.cs | 4 ++++ .../SnsMessagePublisher.cs | 5 +++++ .../CommandProcessorBuilder.cs | 4 +++- .../OutboxProducerMediator.cs | 5 +---- .../SchedulerMessageConsumer.cs | 5 +++++ 8 files changed, 53 insertions(+), 17 deletions(-) diff --git a/docker-compose-rmq.yaml b/docker-compose-rmq.yaml index a2846c8a1a..5b2b5f43d0 100644 --- a/docker-compose-rmq.yaml +++ b/docker-compose-rmq.yaml @@ -1,16 +1,7 @@ -version: '3' - services: rabbitmq: - image: brightercommand/rabbitmq:latest + image: masstransit/rabbitmq platform: linux/arm64 ports: - "5672:5672" - "15672:15672" - volumes: - - rabbitmq-home:/var/lib/rabbitmq - -volumes: - rabbitmq-home: - driver: local - \ No newline at end of file diff --git a/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs b/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs index 632152f373..d7feba70dc 100644 --- a/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs +++ b/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs @@ -75,7 +75,9 @@ static void Main(string[] args) } ).Create(); - serviceCollection.AddBrighter() + serviceCollection + .AddSingleton(new InMemoryMessageSchedulerFactory()) + .AddBrighter() .UseExternalBus((configure) => { configure.ProducerRegistry = producerRegistry; @@ -87,6 +89,20 @@ static void Main(string[] args) var commandProcessor = serviceProvider.GetService(); commandProcessor.Post(new GreetingEvent("Ian says: Hi there!")); + + // TODO Remove this code: + while (true) + { + Console.WriteLine("Enter a name to greet (Q to quit):"); + var name = Console.ReadLine(); + if (name is "Q" or "q") + { + break; + } + + commandProcessor.Scheduler(TimeSpan.FromSeconds(1), new GreetingEvent($"Ian says: Hi {name}")); + } + commandProcessor.Post(new FarewellEvent("Ian says: See you later!")); } } diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs index d671b4b63b..1a1d1eadf1 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs @@ -74,7 +74,9 @@ static void Main(string[] args) } }).Create(); - serviceCollection.AddBrighter() + serviceCollection + .AddSingleton(new InMemoryMessageSchedulerFactory()) + .AddBrighter() .UseExternalBus((configure) => { configure.ProducerRegistry = producerRegistry; @@ -88,6 +90,20 @@ static void Main(string[] args) var commandProcessor = serviceProvider.GetService(); commandProcessor.Post(new GreetingEvent("Ian says: Hi there!")); + + // TODO Remove this code: + while (true) + { + Console.WriteLine("Enter a name to greet (Q to quit):"); + var name = Console.ReadLine(); + if (name is "Q" or "q") + { + break; + } + + commandProcessor.Scheduler(TimeSpan.FromSeconds(60), new GreetingEvent($"Ian says: Hi {name}")); + } + commandProcessor.Post(new FarewellEvent("Ian says: See you later!")); } } diff --git a/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs b/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs index 035e195b73..d9ace9c8a6 100644 --- a/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs @@ -315,6 +315,10 @@ private static object BuildCommandProcessor(IServiceProvider provider) var requestContextFactory = provider.GetService(); var builder = contextBuilder.RequestContextFactory(requestContextFactory); + + var schedulerMessageFactory = provider.GetService(); + + builder.MessageSchedulerFactory(schedulerMessageFactory); var commandProcessor = builder.Build(); diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs index caf9bebd0d..84bda410de 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs @@ -50,6 +50,11 @@ public SnsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien var messageString = message.Body.Value; var publishRequest = new PublishRequest(_topicArn, messageString, message.Header.Subject); + if (string.IsNullOrEmpty(message.Header.CorrelationId)) + { + message.Header.CorrelationId = Guid.NewGuid().ToString(); + } + var messageAttributes = new Dictionary { [HeaderNames.Id] = diff --git a/src/Paramore.Brighter/CommandProcessorBuilder.cs b/src/Paramore.Brighter/CommandProcessorBuilder.cs index 72d5e32257..2c8c1adb53 100644 --- a/src/Paramore.Brighter/CommandProcessorBuilder.cs +++ b/src/Paramore.Brighter/CommandProcessorBuilder.cs @@ -441,7 +441,7 @@ public interface INeedARequestContext // TODO Add doc public interface INeedAMessageSchedulerFactory { - IAmACommandProcessorBuilder MessageSchedulerFactory(IAmAMessageSchedulerFactory messageSchedulerFactory); + } /// @@ -449,6 +449,8 @@ public interface INeedAMessageSchedulerFactory /// public interface IAmACommandProcessorBuilder { + IAmACommandProcessorBuilder MessageSchedulerFactory(IAmAMessageSchedulerFactory messageSchedulerFactory); + /// /// Builds this instance. /// diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index e5edb0ac35..d4c43d588a 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -58,7 +58,6 @@ public class OutboxProducerMediator : IAmAnOutboxProduce private readonly IAmAProducerRegistry _producerRegistry; private readonly InstrumentationOptions _instrumentationOptions; private readonly Dictionary> _outboxBatches = new(); - private readonly IAmAMessageSchedulerFactory? _messageSchedulerFactory; private static readonly SemaphoreSlim s_clearSemaphoreToken = new(1, 1); @@ -113,8 +112,7 @@ public OutboxProducerMediator( TimeSpan? maxOutStandingCheckInterval = null, Dictionary? outBoxBag = null, TimeProvider? timeProvider = null, - InstrumentationOptions instrumentationOptions = InstrumentationOptions.All, - IAmAMessageSchedulerFactory? messageSchedulerFactory = null) + InstrumentationOptions instrumentationOptions = InstrumentationOptions.All) { _producerRegistry = producerRegistry ?? throw new ConfigurationException("Missing Producer Registry for External Bus Services"); @@ -153,7 +151,6 @@ public OutboxProducerMediator( _outBoxBag = outBoxBag ?? new Dictionary(); _instrumentationOptions = instrumentationOptions; _tracer = tracer; - _messageSchedulerFactory = messageSchedulerFactory; ConfigureCallbacks(requestContextFactory.Create()); } diff --git a/src/Paramore.Brighter/SchedulerMessageConsumer.cs b/src/Paramore.Brighter/SchedulerMessageConsumer.cs index 45cc2f96e6..c05b97164b 100644 --- a/src/Paramore.Brighter/SchedulerMessageConsumer.cs +++ b/src/Paramore.Brighter/SchedulerMessageConsumer.cs @@ -1,5 +1,7 @@ using System; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Paramore.Brighter.Logging; namespace Paramore.Brighter; @@ -9,8 +11,10 @@ public class SchedulerMessageConsumer( IAmASchedulerMessageConsumerSync, IAmASchedulerMessageConsumerAsync { + private static readonly ILogger s_logger = ApplicationLogging.CreateLogger>(); public async Task ConsumeAsync(Message message, RequestContext context) { + s_logger.LogInformation("Publishing scheduler message"); if (!mediator.HasOutbox()) { throw new InvalidOperationException("No outbox defined."); @@ -23,6 +27,7 @@ public async Task ConsumeAsync(Message message, RequestContext context) public void Consume(Message message, RequestContext context) { + s_logger.LogInformation("Publishing scheduler message"); if (!mediator.HasOutbox()) { throw new InvalidOperationException("No outbox defined."); From 6ddfa0c97d511d5e34a96938caad1bdc7230d5bb Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Mon, 20 Jan 2025 12:03:23 +0000 Subject: [PATCH 3/6] Improve scheduler --- .../AWSTaskQueue/GreetingsSender/Program.cs | 4 +- .../RMQTaskQueue/GreetingsSender/Program.cs | 2 +- .../ControlBus/ControlBusReceiverBuilder.cs | 3 + src/Paramore.Brighter/CommandProcessor.cs | 173 ++++++------------ src/Paramore.Brighter/IAmACommandProcessor.cs | 4 +- .../IAmAMessageSchedulerAsync.cs | 7 +- .../IAmAMessageSchedulerFactory.cs | 5 +- .../IAmAMessageSchedulerSync.cs | 7 +- .../IAmASchedulerMessageConsumer.cs | 6 - .../IAmASchedulerMessageConsumerAsync.cs | 8 - .../IAmASchedulerMessageConsumerSync.cs | 6 - .../InMemoryMessageScheduler.cs | 43 +++-- .../InMemoryMessageSchedulerFactory.cs | 26 +-- .../CommandProcessorSpanOperation.cs | 3 +- .../Scheduler/Events/SchedulerMessageFired.cs | 17 ++ .../Handlers/SchedulerMessageFiredHandler.cs | 104 +++++++++++ .../SchedulerMessageFiredHandlerAsync.cs | 123 +++++++++++++ .../SchedulerMessageConsumer.cs | 40 ---- 18 files changed, 363 insertions(+), 218 deletions(-) delete mode 100644 src/Paramore.Brighter/IAmASchedulerMessageConsumer.cs delete mode 100644 src/Paramore.Brighter/IAmASchedulerMessageConsumerAsync.cs delete mode 100644 src/Paramore.Brighter/IAmASchedulerMessageConsumerSync.cs create mode 100644 src/Paramore.Brighter/Scheduler/Events/SchedulerMessageFired.cs create mode 100644 src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandler.cs create mode 100644 src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandlerAsync.cs delete mode 100644 src/Paramore.Brighter/SchedulerMessageConsumer.cs diff --git a/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs b/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs index d7feba70dc..1b3add8d41 100644 --- a/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs +++ b/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs @@ -86,7 +86,7 @@ static void Main(string[] args) var serviceProvider = serviceCollection.BuildServiceProvider(); - var commandProcessor = serviceProvider.GetService(); + var commandProcessor = serviceProvider.GetRequiredService(); commandProcessor.Post(new GreetingEvent("Ian says: Hi there!")); @@ -100,7 +100,7 @@ static void Main(string[] args) break; } - commandProcessor.Scheduler(TimeSpan.FromSeconds(1), new GreetingEvent($"Ian says: Hi {name}")); + commandProcessor.SchedulerPost(TimeSpan.FromSeconds(10), new GreetingEvent($"Ian says: Hi {name}")); } commandProcessor.Post(new FarewellEvent("Ian says: See you later!")); diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs index 1a1d1eadf1..e6444cbbce 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs @@ -101,7 +101,7 @@ static void Main(string[] args) break; } - commandProcessor.Scheduler(TimeSpan.FromSeconds(60), new GreetingEvent($"Ian says: Hi {name}")); + commandProcessor.SchedulerPost(TimeSpan.FromSeconds(60), new GreetingEvent($"Ian says: Hi {name}")); } commandProcessor.Post(new FarewellEvent("Ian says: See you later!")); diff --git a/src/Paramore.Brighter.ServiceActivator/ControlBus/ControlBusReceiverBuilder.cs b/src/Paramore.Brighter.ServiceActivator/ControlBus/ControlBusReceiverBuilder.cs index 55f92b51d4..e757e16b7e 100644 --- a/src/Paramore.Brighter.ServiceActivator/ControlBus/ControlBusReceiverBuilder.cs +++ b/src/Paramore.Brighter.ServiceActivator/ControlBus/ControlBusReceiverBuilder.cs @@ -26,6 +26,8 @@ THE SOFTWARE. */ using System.Collections.Generic; using System.Transactions; using Paramore.Brighter.Observability; +using Paramore.Brighter.Scheduler.Events; +using Paramore.Brighter.Scheduler.Handlers; using Paramore.Brighter.ServiceActivator.Ports; using Paramore.Brighter.ServiceActivator.Ports.Commands; using Paramore.Brighter.ServiceActivator.Ports.Handlers; @@ -139,6 +141,7 @@ public Dispatcher Build(string hostName) var subscriberRegistry = new SubscriberRegistry(); subscriberRegistry.Register(); subscriberRegistry.Register(); + subscriberRegistry.RegisterAsync(); var incomingMessageMapperRegistry = new MessageMapperRegistry( new ControlBusMessageMapperFactory(), null diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index 98b71471a2..22068c6e79 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -37,6 +37,7 @@ THE SOFTWARE. */ using Paramore.Brighter.FeatureSwitch; using Paramore.Brighter.Logging; using Paramore.Brighter.Observability; +using Paramore.Brighter.Scheduler.Events; using Paramore.Brighter.Tasks; using Polly; using Polly.Registry; @@ -225,70 +226,49 @@ public CommandProcessor( } /// - public void Scheduler(TimeSpan delay, TRequest request, RequestContext? requestContext = null) - where TRequest : class, IRequest - => Scheduler(delay, request, null, requestContext); - - public void Scheduler(TimeSpan delay, - TRequest request, - IAmABoxTransactionProvider? transactionProvider, - RequestContext? requestContext = null) - where TRequest : class, IRequest - { - if (_messageSchedulerFactory == null) - { - throw new InvalidOperationException("No message scheduler factory set."); - } + public void SchedulerPost(TimeSpan delay, TRequest request, RequestContext? requestContext = null) + where TRequest : class, IRequest + { + if (_messageSchedulerFactory == null) + { + throw new InvalidOperationException("No message scheduler factory set."); + } - s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); - var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Send, request, requestContext?.Span, options: _instrumentationOptions); - var context = InitRequestContext(span, requestContext); - - var message = s_mediator!.CreateMessageFromRequest(request, context); - var scheduler = _messageSchedulerFactory.Create(s_mediator, transactionProvider); - if (scheduler is IAmAMessageSchedulerSync sync) - { - sync.Schedule(delay, message, context); - } - else if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) - { - BrighterAsyncContext.Run(async () => await asyncScheduler.ScheduleAsync(delay, message, context)); - } - } + s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); + var scheduler = _messageSchedulerFactory.Create(this); + if (scheduler is IAmAMessageSchedulerSync sync) + { + sync.Schedule(delay, SchedulerFireType.Post, request); + } + else if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) + { + BrighterAsyncContext.Run(async () => await asyncScheduler.ScheduleAsync(delay, SchedulerFireType.Post, request)); + } + } /// - public void Scheduler(DateTimeOffset at, + public void SchedulerPost(DateTimeOffset at, TRequest request, RequestContext? requestContext = null) - where TRequest : class, IRequest => Scheduler(at, request, null, requestContext); - - public void Scheduler(DateTimeOffset at, - TRequest request, - IAmABoxTransactionProvider? transactionProvider, - RequestContext? requestContext = null) where TRequest : class, IRequest { - if (_messageSchedulerFactory == null) + if (_messageSchedulerFactory == null) { throw new InvalidOperationException("No message scheduler factory set."); } s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); - var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Send, request, requestContext?.Span, options: _instrumentationOptions); - var context = InitRequestContext(span, requestContext); - - var message = s_mediator!.CreateMessageFromRequest(request, context); - var scheduler = _messageSchedulerFactory.Create(s_mediator, transactionProvider); - if (scheduler is IAmAMessageSchedulerSync sync) - { - sync.Schedule(at, message, context); - } - else if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) - { - BrighterAsyncContext.Run(async () => await asyncScheduler.ScheduleAsync(at, message, context)); - } + var scheduler = _messageSchedulerFactory.Create(this); + if (scheduler is IAmAMessageSchedulerSync sync) + { + sync.Schedule(at, SchedulerFireType.Post, request); + } + else if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) + { + BrighterAsyncContext.Run(async () => await asyncScheduler.ScheduleAsync(at, SchedulerFireType.Post, request)); + } } - + /// public async Task SchedulerAsync(TimeSpan delay, @@ -296,41 +276,23 @@ public async Task SchedulerAsync(TimeSpan delay, RequestContext? requestContext = null, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) - where TRequest : class, IRequest => - await SchedulerAsync(delay, - request, - null, - requestContext, - continueOnCapturedContext, - cancellationToken); - - public async Task SchedulerAsync(TimeSpan delay, - TRequest request, - IAmABoxTransactionProvider? transactionProvider, - RequestContext? requestContext = null, - bool continueOnCapturedContext = true, - CancellationToken cancellationToken = default) where TRequest : class, IRequest - { + { if (_messageSchedulerFactory == null) { throw new InvalidOperationException("No message scheduler factory set."); } s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); - var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Send, request, requestContext?.Span, options: _instrumentationOptions); - var context = InitRequestContext(span, requestContext); - - var message = await s_mediator!.CreateMessageFromRequestAsync(request, context, cancellationToken).ConfigureAwait(continueOnCapturedContext); - var scheduler = _messageSchedulerFactory.Create(s_mediator, transactionProvider); - if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) - { - await asyncScheduler.ScheduleAsync(delay, message, context, cancellationToken).ConfigureAwait(continueOnCapturedContext); - } - else if (scheduler is IAmAMessageSchedulerSync sync) - { - sync.Schedule(delay, message, context); - } + var scheduler = _messageSchedulerFactory.Create(this); + if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) + { + await asyncScheduler.ScheduleAsync(delay, SchedulerFireType.Post, request, cancellationToken); + } + else if (scheduler is IAmAMessageSchedulerSync sync) + { + sync.Schedule(delay, SchedulerFireType.Post, request); + } } /// @@ -339,42 +301,25 @@ public async Task SchedulerAsync(DateTimeOffset at, RequestContext? requestContext = null, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) - where TRequest : class, IRequest => - await SchedulerAsync(at, - request, - null, - requestContext, - continueOnCapturedContext, - cancellationToken); + where TRequest : class, IRequest + { + if (_messageSchedulerFactory == null) + { + throw new InvalidOperationException("No message scheduler factory set."); + } + + s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); + var scheduler = _messageSchedulerFactory.Create(this); + if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) + { + await asyncScheduler.ScheduleAsync(at, SchedulerFireType.Post, request, cancellationToken); + } + else if (scheduler is IAmAMessageSchedulerSync sync) + { + sync.Schedule(at, SchedulerFireType.Post, request); + } + } - public async Task SchedulerAsync(DateTimeOffset at, - TRequest request, - IAmABoxTransactionProvider? transactionProvider, - RequestContext? requestContext = null, - bool continueOnCapturedContext = true, - CancellationToken cancellationToken = default) - where TRequest : class, IRequest - { - if (_messageSchedulerFactory == null) - { - throw new InvalidOperationException("No message scheduler factory set."); - } - - s_logger.LogInformation("Scheduling request: {RequestType} {Id}", request.GetType(), request.Id); - var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Send, request, requestContext?.Span, options: _instrumentationOptions); - var context = InitRequestContext(span, requestContext); - - var message = await s_mediator!.CreateMessageFromRequestAsync(request, context, cancellationToken).ConfigureAwait(continueOnCapturedContext); - var scheduler = _messageSchedulerFactory.Create(s_mediator, transactionProvider); - if (scheduler is IAmAMessageSchedulerAsync asyncScheduler) - { - await asyncScheduler.ScheduleAsync(at, message, context, cancellationToken).ConfigureAwait(continueOnCapturedContext); - } - else if (scheduler is IAmAMessageSchedulerSync sync) - { - sync.Schedule(at, message, context); - } - } /// /// Sends the specified command. We expect only one handler. The command is handled synchronously. diff --git a/src/Paramore.Brighter/IAmACommandProcessor.cs b/src/Paramore.Brighter/IAmACommandProcessor.cs index 07db59f666..9212adc7f0 100644 --- a/src/Paramore.Brighter/IAmACommandProcessor.cs +++ b/src/Paramore.Brighter/IAmACommandProcessor.cs @@ -46,7 +46,7 @@ public interface IAmACommandProcessor /// The amount of delay to be used before send the message. /// The command. /// The context of the request; if null we will start one via a - void Scheduler(TimeSpan delay, TRequest request, RequestContext? requestContext = null) where TRequest : class, IRequest; + void SchedulerPost(TimeSpan delay, TRequest request, RequestContext? requestContext = null) where TRequest : class, IRequest; /// /// Sends the specified command. @@ -55,7 +55,7 @@ public interface IAmACommandProcessor /// The that the message should be published. /// The command. /// The context of the request; if null we will start one via a - void Scheduler(DateTimeOffset at, TRequest request, RequestContext? requestContext = null) where TRequest : class, IRequest; + void SchedulerPost(DateTimeOffset at, TRequest request, RequestContext? requestContext = null) where TRequest : class, IRequest; /// /// Awaitably sends the specified command. diff --git a/src/Paramore.Brighter/IAmAMessageSchedulerAsync.cs b/src/Paramore.Brighter/IAmAMessageSchedulerAsync.cs index 86ea356212..80751b0068 100644 --- a/src/Paramore.Brighter/IAmAMessageSchedulerAsync.cs +++ b/src/Paramore.Brighter/IAmAMessageSchedulerAsync.cs @@ -1,12 +1,15 @@ using System; using System.Threading; using System.Threading.Tasks; +using Paramore.Brighter.Scheduler.Events; namespace Paramore.Brighter; public interface IAmAMessageSchedulerAsync : IAmAMessageScheduler, IDisposable { - Task ScheduleAsync(DateTimeOffset at, Message message, RequestContext context, CancellationToken cancellationToken = default); - Task ScheduleAsync(TimeSpan delay, Message message, RequestContext context, CancellationToken cancellationToken = default); + Task ScheduleAsync(DateTimeOffset at, SchedulerFireType fireType, TRequest request, CancellationToken cancellationToken = default) + where TRequest : class, IRequest; + Task ScheduleAsync(TimeSpan delay, SchedulerFireType fireType, TRequest request, CancellationToken cancellationToken = default) + where TRequest : class, IRequest; Task CancelSchedulerAsync(string id, CancellationToken cancellationToken = default); } diff --git a/src/Paramore.Brighter/IAmAMessageSchedulerFactory.cs b/src/Paramore.Brighter/IAmAMessageSchedulerFactory.cs index 5327fe7c11..beaf2b1567 100644 --- a/src/Paramore.Brighter/IAmAMessageSchedulerFactory.cs +++ b/src/Paramore.Brighter/IAmAMessageSchedulerFactory.cs @@ -2,8 +2,5 @@ public interface IAmAMessageSchedulerFactory { - IAmAMessageScheduler Create(IAmAnOutboxProducerMediator mediator); - - IAmAMessageScheduler Create(IAmAnOutboxProducerMediator mediator, - IAmABoxTransactionProvider? transactionProvider); + IAmAMessageScheduler Create(IAmACommandProcessor processor); } diff --git a/src/Paramore.Brighter/IAmAMessageSchedulerSync.cs b/src/Paramore.Brighter/IAmAMessageSchedulerSync.cs index 50f06ccc61..0deac43136 100644 --- a/src/Paramore.Brighter/IAmAMessageSchedulerSync.cs +++ b/src/Paramore.Brighter/IAmAMessageSchedulerSync.cs @@ -1,10 +1,13 @@ using System; +using Paramore.Brighter.Scheduler.Events; namespace Paramore.Brighter; public interface IAmAMessageSchedulerSync : IAmAMessageScheduler, IDisposable { - string Schedule(DateTimeOffset at, Message message, RequestContext context); - string Schedule(TimeSpan delay, Message message, RequestContext context); + string Schedule(DateTimeOffset at, SchedulerFireType fireType, TRequest request) + where TRequest : class, IRequest; + string Schedule(TimeSpan delay, SchedulerFireType fireType, TRequest request) + where TRequest : class, IRequest; void CancelScheduler(string id); } diff --git a/src/Paramore.Brighter/IAmASchedulerMessageConsumer.cs b/src/Paramore.Brighter/IAmASchedulerMessageConsumer.cs deleted file mode 100644 index c917ae6c88..0000000000 --- a/src/Paramore.Brighter/IAmASchedulerMessageConsumer.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Paramore.Brighter; - -public interface IAmASchedulerMessageConsumer -{ - -} diff --git a/src/Paramore.Brighter/IAmASchedulerMessageConsumerAsync.cs b/src/Paramore.Brighter/IAmASchedulerMessageConsumerAsync.cs deleted file mode 100644 index 77b87a8964..0000000000 --- a/src/Paramore.Brighter/IAmASchedulerMessageConsumerAsync.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System.Threading.Tasks; - -namespace Paramore.Brighter; - -public interface IAmASchedulerMessageConsumerAsync : IAmASchedulerMessageConsumer -{ - Task ConsumeAsync(Message message, RequestContext context); -} diff --git a/src/Paramore.Brighter/IAmASchedulerMessageConsumerSync.cs b/src/Paramore.Brighter/IAmASchedulerMessageConsumerSync.cs deleted file mode 100644 index ea58eed2c7..0000000000 --- a/src/Paramore.Brighter/IAmASchedulerMessageConsumerSync.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Paramore.Brighter; - -public interface IAmASchedulerMessageConsumerSync : IAmASchedulerMessageConsumer -{ - void Consume(Message message, RequestContext context); -} diff --git a/src/Paramore.Brighter/InMemoryMessageScheduler.cs b/src/Paramore.Brighter/InMemoryMessageScheduler.cs index 448203aa39..58fcc20faa 100644 --- a/src/Paramore.Brighter/InMemoryMessageScheduler.cs +++ b/src/Paramore.Brighter/InMemoryMessageScheduler.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Text.Json; using System.Threading; +using Paramore.Brighter.Scheduler.Events; using Paramore.Brighter.Tasks; namespace Paramore.Brighter; @@ -8,15 +10,15 @@ namespace Paramore.Brighter; public class InMemoryMessageScheduler : IAmAMessageSchedulerSync { private readonly SchedulerMessageCollection _messages = new(); - private readonly IAmASchedulerMessageConsumer _consumer; + private readonly IAmACommandProcessor _processor; private readonly Timer _timer; - public InMemoryMessageScheduler(IAmASchedulerMessageConsumer consumer, + public InMemoryMessageScheduler(IAmACommandProcessor processor, TimeSpan initialDelay, TimeSpan period) { - _consumer = consumer; + _processor = processor; _timer = new Timer(Consume, this, initialDelay, period); } @@ -28,38 +30,43 @@ private static void Consume(object? state) var schedulerMessage = scheduler._messages.Next(now); while (schedulerMessage != null) { - if (scheduler._consumer is IAmASchedulerMessageConsumerSync syncConsumer) + BrighterAsyncContext.Run(async () => await scheduler._processor.SendAsync(new SchedulerMessageFired(schedulerMessage.Id) { - syncConsumer.Consume(schedulerMessage.Message, schedulerMessage.Context); - } - else if (scheduler._consumer is IAmASchedulerMessageConsumerAsync asyncConsumer) - { - var tmp = schedulerMessage; - BrighterAsyncContext.Run(async () => await asyncConsumer.ConsumeAsync(tmp.Message, tmp.Context)); - } + FireType = schedulerMessage.FireType, + MessageType = schedulerMessage.MessageType, + MessageData = schedulerMessage.MessageData, + })); // TODO Add log schedulerMessage = scheduler._messages.Next(now); } } - public string Schedule(DateTimeOffset at, Message message, RequestContext context) + public string Schedule(DateTimeOffset at, SchedulerFireType fireType, TRequest request) + where TRequest : class, IRequest { var id = Guid.NewGuid().ToString(); - _messages.Add(new SchedulerMessage(id, message, context, at)); + _messages.Add(new SchedulerMessage(id, at, fireType, + typeof(TRequest).FullName!, + JsonSerializer.Serialize(request, JsonSerialisationOptions.Options))); return id; } - public string Schedule(TimeSpan delay, Message message, RequestContext context) - => Schedule(DateTimeOffset.UtcNow.Add(delay), message, context); + public string Schedule(TimeSpan delay, SchedulerFireType fireType, TRequest request) + where TRequest : class, IRequest + => Schedule(DateTimeOffset.UtcNow.Add(delay), fireType, request); - public void CancelScheduler(string id) + public void CancelScheduler(string id) => _messages.Delete(id); public void Dispose() => _timer.Dispose(); - - private record SchedulerMessage(string Id, Message Message, RequestContext Context, DateTimeOffset At); + private record SchedulerMessage( + string Id, + DateTimeOffset At, + SchedulerFireType FireType, + string MessageType, + string MessageData); private class SchedulerMessageCollection { diff --git a/src/Paramore.Brighter/InMemoryMessageSchedulerFactory.cs b/src/Paramore.Brighter/InMemoryMessageSchedulerFactory.cs index 345df932dd..29a5fbf722 100644 --- a/src/Paramore.Brighter/InMemoryMessageSchedulerFactory.cs +++ b/src/Paramore.Brighter/InMemoryMessageSchedulerFactory.cs @@ -1,6 +1,4 @@ using System; -using System.Collections.Generic; -using System.Transactions; namespace Paramore.Brighter; @@ -11,21 +9,25 @@ public InMemoryMessageSchedulerFactory() { } - private static readonly Dictionary s_schedulers = new(); + public IAmAMessageScheduler Create(IAmACommandProcessor processor) + { + return GetOrCreate(processor, initialDelay, period); + } - public IAmAMessageScheduler Create(IAmAnOutboxProducerMediator mediator) - => Create(mediator, null); + private static readonly object s_lock = new(); + private static InMemoryMessageScheduler? s_scheduler; - public IAmAMessageScheduler Create(IAmAnOutboxProducerMediator mediator, - IAmABoxTransactionProvider? transactionProvider) + private static InMemoryMessageScheduler GetOrCreate(IAmACommandProcessor processor, TimeSpan initialDelay, + TimeSpan period) { - if (!s_schedulers.TryGetValue(typeof(TTransaction), out var scheduler)) + if (s_scheduler == null) { - var consumer = new SchedulerMessageConsumer(mediator, transactionProvider); - scheduler = new InMemoryMessageScheduler(consumer, initialDelay, period); - s_schedulers[typeof(TTransaction)] = scheduler; + lock (s_lock) + { + s_scheduler ??= new InMemoryMessageScheduler(processor, initialDelay, period); + } } - return scheduler; + return s_scheduler; } } diff --git a/src/Paramore.Brighter/Observability/CommandProcessorSpanOperation.cs b/src/Paramore.Brighter/Observability/CommandProcessorSpanOperation.cs index 19f29e8b63..2eaa0e8ac8 100644 --- a/src/Paramore.Brighter/Observability/CommandProcessorSpanOperation.cs +++ b/src/Paramore.Brighter/Observability/CommandProcessorSpanOperation.cs @@ -34,5 +34,6 @@ public enum CommandProcessorSpanOperation Publish = 2, // Publish an event Deposit = 3, // Deposit a message in the outbox Clear = 4, // Clear a message from the outbox - Archive = 5 //Archive a message from the outbox + Archive = 5, //Archive a message from the outbox + Scheduler = 6 } diff --git a/src/Paramore.Brighter/Scheduler/Events/SchedulerMessageFired.cs b/src/Paramore.Brighter/Scheduler/Events/SchedulerMessageFired.cs new file mode 100644 index 0000000000..939d1ee809 --- /dev/null +++ b/src/Paramore.Brighter/Scheduler/Events/SchedulerMessageFired.cs @@ -0,0 +1,17 @@ +namespace Paramore.Brighter.Scheduler.Events; + +// TODO Add doc + +public class SchedulerMessageFired(string id) : Event(id) +{ + public SchedulerFireType FireType { get; set; } = SchedulerFireType.Send; + public string MessageType { get; set; } = string.Empty; + public string MessageData { get; set; } = string.Empty; +} + +public enum SchedulerFireType +{ + Send, + Publish, + Post +} diff --git a/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandler.cs b/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandler.cs new file mode 100644 index 0000000000..3b9d0942fd --- /dev/null +++ b/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandler.cs @@ -0,0 +1,104 @@ +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Text.Json; +using Paramore.Brighter.Scheduler.Events; + +namespace Paramore.Brighter.Scheduler.Handlers; + +// public class SchedulerMessageFiredHandler(IAmACommandProcessor processor) : RequestHandler +// { +// private static readonly ConcurrentDictionary s_types = new(); +// +// private static readonly MethodInfo s_sendMethod = typeof(SchedulerMessageFiredHandler) +// .GetMethod(nameof(Send), BindingFlags.Static | BindingFlags.NonPublic)!; +// +// private static readonly MethodInfo s_publishMethod = typeof(SchedulerMessageFiredHandler) +// .GetMethod(nameof(Publish), BindingFlags.Static | BindingFlags.NonPublic)!; +// +// private static readonly MethodInfo s_postMethod = typeof(SchedulerMessageFiredHandler) +// .GetMethod(nameof(Post), BindingFlags.Static | BindingFlags.NonPublic)!; +// +// private static readonly ConcurrentDictionary> s_send = new(); +// private static readonly ConcurrentDictionary> s_publish = new(); +// private static readonly ConcurrentDictionary> s_post = new(); +// +// public override SchedulerMessageFired Handle(SchedulerMessageFired command) +// { +// var type = s_types.GetOrAdd(command.MessageType, CreateType); +// if (command.FireType == SchedulerFireType.Send) +// { +// var send = s_send.GetOrAdd(type, CreateSend); +// send(processor, command.MessageData); +// } +// else if (command.FireType == SchedulerFireType.Publish) +// { +// var publish = s_publish.GetOrAdd(type, CreatePublish); +// publish(processor, command.MessageData); +// } +// else +// { +// var publish = s_publish.GetOrAdd(type, CreatePost); +// publish(processor, command.MessageData); +// } +// +// return base.Handle(command); +// } +// +// private static Type CreateType(string messageType) +// { +// var type = Type.GetType(messageType); +// if (type == null) +// { +// throw new InvalidOperationException($"The message type doesn't exits: '{messageType}'"); +// } +// +// return type; +// } +// +// private static void Send(IAmACommandProcessor commandProcessor, string data) +// where TRequest : class, IRequest +// { +// var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; +// commandProcessor.Send(request); +// } +// +// private static Action CreateSend(Type type) +// { +// var method = s_sendMethod.MakeGenericMethod(type); +// var action = (Action)method +// .CreateDelegate(typeof(Action)); +// return action; +// } +// +// private static void Publish(IAmACommandProcessor commandProcessor, string data) +// where TRequest : class, IRequest +// { +// var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; +// commandProcessor.Publish(request); +// } +// +// private static Action CreatePublish(Type type) +// { +// var method = s_publishMethod.MakeGenericMethod(type); +// var action = (Action)method +// .CreateDelegate(typeof(Action)); +// return action; +// } +// +// +// private static void Post(IAmACommandProcessor commandProcessor, string data) +// where TRequest : class, IRequest +// { +// var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; +// commandProcessor.Post(request); +// } +// +// private static Action CreatePost(Type type) +// { +// var method = s_postMethod.MakeGenericMethod(type); +// var action = (Action)method +// .CreateDelegate(typeof(Action)); +// return action; +// } +// } diff --git a/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandlerAsync.cs b/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandlerAsync.cs new file mode 100644 index 0000000000..c6ced1badf --- /dev/null +++ b/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandlerAsync.cs @@ -0,0 +1,123 @@ +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Paramore.Brighter.Scheduler.Events; + +namespace Paramore.Brighter.Scheduler.Handlers; + +public class SchedulerMessageFiredHandlerAsync(IAmACommandProcessor processor) + : RequestHandlerAsync +{ + private static readonly ConcurrentDictionary s_types = new(); + + private static readonly MethodInfo s_sendMethod = typeof(SchedulerMessageFiredHandlerAsync) + .GetMethod(nameof(SendAsync), BindingFlags.Static | BindingFlags.NonPublic)!; + + private static readonly MethodInfo s_publishMethod = typeof(SchedulerMessageFiredHandlerAsync) + .GetMethod(nameof(PublishAsync), BindingFlags.Static | BindingFlags.NonPublic)!; + + private static readonly MethodInfo s_postMethod = typeof(SchedulerMessageFiredHandlerAsync) + .GetMethod(nameof(PostAsync), BindingFlags.Static | BindingFlags.NonPublic)!; + + private static readonly ConcurrentDictionary> + s_send = new(); + + private static readonly ConcurrentDictionary> + s_publish = new(); + + private static readonly ConcurrentDictionary> + s_post = new(); + + public override async Task HandleAsync(SchedulerMessageFired command, + CancellationToken cancellationToken = default) + { + var type = s_types.GetOrAdd(command.MessageType, CreateType); + if (command.FireType == SchedulerFireType.Send) + { + var send = s_send.GetOrAdd(type, CreateSend); + await send(processor, command.MessageData, cancellationToken); + } + else if (command.FireType == SchedulerFireType.Publish) + { + var publish = s_publish.GetOrAdd(type, CreatePublish); + await publish(processor, command.MessageData, cancellationToken); + } + else + { + var publish = s_post.GetOrAdd(type, CreatePost); + await publish(processor, command.MessageData, cancellationToken); + } + + return await base.HandleAsync(command, cancellationToken); + } + + private static Type CreateType(string messageType) + { + var assemblies = AppDomain.CurrentDomain.GetAssemblies(); + foreach (var assembly in assemblies) + { + var type = assembly.GetType(messageType); + if (type != null) + { + return type; + } + } + + throw new InvalidOperationException($"The message type could not be found: '{messageType}'"); + } + + private static async Task SendAsync(IAmACommandProcessor commandProcessor, + string data, + CancellationToken cancellationToken = default) + where TRequest : class, IRequest + { + var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; + await commandProcessor.SendAsync(request, cancellationToken: cancellationToken); + } + + private static Func CreateSend(Type type) + { + var method = s_sendMethod.MakeGenericMethod(type); + var action = (Func)method + .CreateDelegate(typeof(Func)); + return action; + } + + private static async Task PublishAsync(IAmACommandProcessor commandProcessor, + string data, + CancellationToken cancellationToken) + where TRequest : class, IRequest + { + var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; + await commandProcessor.PublishAsync(request, cancellationToken: cancellationToken); + } + + private static Func CreatePublish(Type type) + { + var method = s_publishMethod.MakeGenericMethod(type); + var action = (Func)method + .CreateDelegate(typeof(Func)); + return action; + } + + + private static async Task PostAsync(IAmACommandProcessor commandProcessor, + string data, + CancellationToken cancellationToken) + where TRequest : class, IRequest + { + var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; + await commandProcessor.PostAsync(request, requestContext: new RequestContext(), cancellationToken: cancellationToken); + } + + private static Func CreatePost(Type type) + { + var method = s_postMethod.MakeGenericMethod(type); + var action = (Func)method + .CreateDelegate(typeof(Func)); + return action; + } +} diff --git a/src/Paramore.Brighter/SchedulerMessageConsumer.cs b/src/Paramore.Brighter/SchedulerMessageConsumer.cs deleted file mode 100644 index c05b97164b..0000000000 --- a/src/Paramore.Brighter/SchedulerMessageConsumer.cs +++ /dev/null @@ -1,40 +0,0 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Paramore.Brighter.Logging; - -namespace Paramore.Brighter; - -public class SchedulerMessageConsumer( - IAmAnOutboxProducerMediator mediator, - IAmABoxTransactionProvider? transactionProvider) : - IAmASchedulerMessageConsumerSync, - IAmASchedulerMessageConsumerAsync -{ - private static readonly ILogger s_logger = ApplicationLogging.CreateLogger>(); - public async Task ConsumeAsync(Message message, RequestContext context) - { - s_logger.LogInformation("Publishing scheduler message"); - if (!mediator.HasOutbox()) - { - throw new InvalidOperationException("No outbox defined."); - } - - var outbox = (IAmAnOutboxProducerMediator)mediator; - await outbox.AddToOutboxAsync(message, context, transactionProvider); - await outbox.ClearOutboxAsync([message.Id], context); - } - - public void Consume(Message message, RequestContext context) - { - s_logger.LogInformation("Publishing scheduler message"); - if (!mediator.HasOutbox()) - { - throw new InvalidOperationException("No outbox defined."); - } - - var outbox = (IAmAnOutboxProducerMediator)mediator; - outbox.AddToOutbox(message, context, transactionProvider); - outbox.ClearOutbox([message.Id], context); - } -} From 8fa4a58d654120cd8b1ab240a2fb775dc59195b4 Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Mon, 20 Jan 2025 14:14:08 +0000 Subject: [PATCH 4/6] Improve scheduler --- .../InMemoryMessageScheduler.cs | 13 ++- .../Scheduler/Events/SchedulerMessageFired.cs | 1 + .../Handlers/SchedulerMessageFiredHandler.cs | 104 ----------------- .../SchedulerMessageFiredHandlerAsync.cs | 109 ++++++------------ 4 files changed, 45 insertions(+), 182 deletions(-) delete mode 100644 src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandler.cs diff --git a/src/Paramore.Brighter/InMemoryMessageScheduler.cs b/src/Paramore.Brighter/InMemoryMessageScheduler.cs index 58fcc20faa..910e180b21 100644 --- a/src/Paramore.Brighter/InMemoryMessageScheduler.cs +++ b/src/Paramore.Brighter/InMemoryMessageScheduler.cs @@ -30,11 +30,13 @@ private static void Consume(object? state) var schedulerMessage = scheduler._messages.Next(now); while (schedulerMessage != null) { - BrighterAsyncContext.Run(async () => await scheduler._processor.SendAsync(new SchedulerMessageFired(schedulerMessage.Id) + var tmp = schedulerMessage; + BrighterAsyncContext.Run(async () => await scheduler._processor.SendAsync(new SchedulerMessageFired(tmp.Id) { - FireType = schedulerMessage.FireType, - MessageType = schedulerMessage.MessageType, - MessageData = schedulerMessage.MessageData, + FireType = tmp.FireType, + MessageType = tmp.MessageType, + MessageData = tmp.MessageData, + UseAsync = tmp.UseAsync })); // TODO Add log @@ -46,7 +48,7 @@ public string Schedule(DateTimeOffset at, SchedulerFireType fireType, where TRequest : class, IRequest { var id = Guid.NewGuid().ToString(); - _messages.Add(new SchedulerMessage(id, at, fireType, + _messages.Add(new SchedulerMessage(id, at, fireType, false, typeof(TRequest).FullName!, JsonSerializer.Serialize(request, JsonSerialisationOptions.Options))); return id; @@ -65,6 +67,7 @@ private record SchedulerMessage( string Id, DateTimeOffset At, SchedulerFireType FireType, + bool UseAsync, string MessageType, string MessageData); diff --git a/src/Paramore.Brighter/Scheduler/Events/SchedulerMessageFired.cs b/src/Paramore.Brighter/Scheduler/Events/SchedulerMessageFired.cs index 939d1ee809..4684f4e63c 100644 --- a/src/Paramore.Brighter/Scheduler/Events/SchedulerMessageFired.cs +++ b/src/Paramore.Brighter/Scheduler/Events/SchedulerMessageFired.cs @@ -4,6 +4,7 @@ public class SchedulerMessageFired(string id) : Event(id) { + public bool UseAsync { get; set; } public SchedulerFireType FireType { get; set; } = SchedulerFireType.Send; public string MessageType { get; set; } = string.Empty; public string MessageData { get; set; } = string.Empty; diff --git a/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandler.cs b/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandler.cs deleted file mode 100644 index 3b9d0942fd..0000000000 --- a/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandler.cs +++ /dev/null @@ -1,104 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Reflection; -using System.Text.Json; -using Paramore.Brighter.Scheduler.Events; - -namespace Paramore.Brighter.Scheduler.Handlers; - -// public class SchedulerMessageFiredHandler(IAmACommandProcessor processor) : RequestHandler -// { -// private static readonly ConcurrentDictionary s_types = new(); -// -// private static readonly MethodInfo s_sendMethod = typeof(SchedulerMessageFiredHandler) -// .GetMethod(nameof(Send), BindingFlags.Static | BindingFlags.NonPublic)!; -// -// private static readonly MethodInfo s_publishMethod = typeof(SchedulerMessageFiredHandler) -// .GetMethod(nameof(Publish), BindingFlags.Static | BindingFlags.NonPublic)!; -// -// private static readonly MethodInfo s_postMethod = typeof(SchedulerMessageFiredHandler) -// .GetMethod(nameof(Post), BindingFlags.Static | BindingFlags.NonPublic)!; -// -// private static readonly ConcurrentDictionary> s_send = new(); -// private static readonly ConcurrentDictionary> s_publish = new(); -// private static readonly ConcurrentDictionary> s_post = new(); -// -// public override SchedulerMessageFired Handle(SchedulerMessageFired command) -// { -// var type = s_types.GetOrAdd(command.MessageType, CreateType); -// if (command.FireType == SchedulerFireType.Send) -// { -// var send = s_send.GetOrAdd(type, CreateSend); -// send(processor, command.MessageData); -// } -// else if (command.FireType == SchedulerFireType.Publish) -// { -// var publish = s_publish.GetOrAdd(type, CreatePublish); -// publish(processor, command.MessageData); -// } -// else -// { -// var publish = s_publish.GetOrAdd(type, CreatePost); -// publish(processor, command.MessageData); -// } -// -// return base.Handle(command); -// } -// -// private static Type CreateType(string messageType) -// { -// var type = Type.GetType(messageType); -// if (type == null) -// { -// throw new InvalidOperationException($"The message type doesn't exits: '{messageType}'"); -// } -// -// return type; -// } -// -// private static void Send(IAmACommandProcessor commandProcessor, string data) -// where TRequest : class, IRequest -// { -// var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; -// commandProcessor.Send(request); -// } -// -// private static Action CreateSend(Type type) -// { -// var method = s_sendMethod.MakeGenericMethod(type); -// var action = (Action)method -// .CreateDelegate(typeof(Action)); -// return action; -// } -// -// private static void Publish(IAmACommandProcessor commandProcessor, string data) -// where TRequest : class, IRequest -// { -// var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; -// commandProcessor.Publish(request); -// } -// -// private static Action CreatePublish(Type type) -// { -// var method = s_publishMethod.MakeGenericMethod(type); -// var action = (Action)method -// .CreateDelegate(typeof(Action)); -// return action; -// } -// -// -// private static void Post(IAmACommandProcessor commandProcessor, string data) -// where TRequest : class, IRequest -// { -// var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; -// commandProcessor.Post(request); -// } -// -// private static Action CreatePost(Type type) -// { -// var method = s_postMethod.MakeGenericMethod(type); -// var action = (Action)method -// .CreateDelegate(typeof(Action)); -// return action; -// } -// } diff --git a/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandlerAsync.cs b/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandlerAsync.cs index c6ced1badf..689b353f51 100644 --- a/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandlerAsync.cs +++ b/src/Paramore.Brighter/Scheduler/Handlers/SchedulerMessageFiredHandlerAsync.cs @@ -13,43 +13,20 @@ public class SchedulerMessageFiredHandlerAsync(IAmACommandProcessor processor) { private static readonly ConcurrentDictionary s_types = new(); - private static readonly MethodInfo s_sendMethod = typeof(SchedulerMessageFiredHandlerAsync) - .GetMethod(nameof(SendAsync), BindingFlags.Static | BindingFlags.NonPublic)!; + private static readonly MethodInfo s_executeAsyncMethod = typeof(SchedulerMessageFiredHandlerAsync) + .GetMethod(nameof(ExecuteAsync), BindingFlags.Static | BindingFlags.NonPublic)!; - private static readonly MethodInfo s_publishMethod = typeof(SchedulerMessageFiredHandlerAsync) - .GetMethod(nameof(PublishAsync), BindingFlags.Static | BindingFlags.NonPublic)!; - - private static readonly MethodInfo s_postMethod = typeof(SchedulerMessageFiredHandlerAsync) - .GetMethod(nameof(PostAsync), BindingFlags.Static | BindingFlags.NonPublic)!; - - private static readonly ConcurrentDictionary> - s_send = new(); - - private static readonly ConcurrentDictionary> - s_publish = new(); - - private static readonly ConcurrentDictionary> - s_post = new(); + private static readonly ConcurrentDictionary> + s_executeAsync = new(); public override async Task HandleAsync(SchedulerMessageFired command, CancellationToken cancellationToken = default) { var type = s_types.GetOrAdd(command.MessageType, CreateType); - if (command.FireType == SchedulerFireType.Send) - { - var send = s_send.GetOrAdd(type, CreateSend); - await send(processor, command.MessageData, cancellationToken); - } - else if (command.FireType == SchedulerFireType.Publish) - { - var publish = s_publish.GetOrAdd(type, CreatePublish); - await publish(processor, command.MessageData, cancellationToken); - } - else - { - var publish = s_post.GetOrAdd(type, CreatePost); - await publish(processor, command.MessageData, cancellationToken); - } + + var execute = s_executeAsync.GetOrAdd(type, CreateExecuteAsync); + await execute(processor, command.MessageData, command.UseAsync, command.FireType, cancellationToken); return await base.HandleAsync(command, cancellationToken); } @@ -69,55 +46,41 @@ private static Type CreateType(string messageType) throw new InvalidOperationException($"The message type could not be found: '{messageType}'"); } - private static async Task SendAsync(IAmACommandProcessor commandProcessor, - string data, - CancellationToken cancellationToken = default) - where TRequest : class, IRequest - { - var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; - await commandProcessor.SendAsync(request, cancellationToken: cancellationToken); - } - - private static Func CreateSend(Type type) - { - var method = s_sendMethod.MakeGenericMethod(type); - var action = (Func)method - .CreateDelegate(typeof(Func)); - return action; - } - - private static async Task PublishAsync(IAmACommandProcessor commandProcessor, + private static ValueTask ExecuteAsync(IAmACommandProcessor commandProcessor, string data, + bool async, + SchedulerFireType fireType, CancellationToken cancellationToken) where TRequest : class, IRequest { var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; - await commandProcessor.PublishAsync(request, cancellationToken: cancellationToken); - } - - private static Func CreatePublish(Type type) - { - var method = s_publishMethod.MakeGenericMethod(type); - var action = (Func)method - .CreateDelegate(typeof(Func)); - return action; - } - - - private static async Task PostAsync(IAmACommandProcessor commandProcessor, - string data, - CancellationToken cancellationToken) - where TRequest : class, IRequest - { - var request = JsonSerializer.Deserialize(data, JsonSerialisationOptions.Options)!; - await commandProcessor.PostAsync(request, requestContext: new RequestContext(), cancellationToken: cancellationToken); + switch (fireType) + { + case SchedulerFireType.Send when async: + return new ValueTask(commandProcessor.SendAsync(request, cancellationToken: cancellationToken)); + case SchedulerFireType.Send: + commandProcessor.Send(request); + return new ValueTask(); + case SchedulerFireType.Publish when async: + return new ValueTask(commandProcessor.PublishAsync(request, cancellationToken: cancellationToken)); + case SchedulerFireType.Publish: + commandProcessor.Publish(request); + return new ValueTask(); + case SchedulerFireType.Post when async: + return new ValueTask(commandProcessor.PostAsync(request, cancellationToken: cancellationToken)); + default: + commandProcessor.Post(request); + return new ValueTask(); + } } - private static Func CreatePost(Type type) + private static Func + CreateExecuteAsync(Type type) { - var method = s_postMethod.MakeGenericMethod(type); - var action = (Func)method - .CreateDelegate(typeof(Func)); - return action; + var method = s_executeAsyncMethod.MakeGenericMethod(type); + var func = (Func)method + .CreateDelegate( + typeof(Func)); + return func; } } From 0c5416332ac85844098484dfa42720fecd35acfa Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Mon, 20 Jan 2025 14:48:22 +0000 Subject: [PATCH 5/6] Add ADR --- docs/adr/0024-Scheduling.md | 89 +++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 docs/adr/0024-Scheduling.md diff --git a/docs/adr/0024-Scheduling.md b/docs/adr/0024-Scheduling.md new file mode 100644 index 0000000000..fef0235691 --- /dev/null +++ b/docs/adr/0024-Scheduling.md @@ -0,0 +1,89 @@ +# 24. Scoping dependencies inline with lifetime scope + +Date: 2025-01-20 + +## Status + +Proposed + +## Context + +Adding the ability to schedule message (by providing `TimeSpan` or `DateTimeOffset`) give to user flexibility to `Send`, `Publis` and `Post`. + + + +## Decision + +Giving support to schedule message, it's necessary breaking on `IAmACommandProcessor` by adding these methods: + +```c# +public interface IAmACommandProcessor +{ + string SchedulerSend(TimeSpan delay, TRequest request) where TRequest : class, IRequest; + string SchedulerSend(DateTimeOffset delay, TRequest request) where TRequest : class, IRequest; + Task SchedulerSendAsync(TimeSpan delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + Task SchedulerSendAsync(DateTimeOffset delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + + string SchedulerPublish(TimeSpan delay, TRequest request) where TRequest : class, IRequest; + string SchedulerPublish(DateTimeOffset delay, TRequest request) where TRequest : class, IRequest; + Task SchedulerPublishAsync(TimeSpan delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + Task SchedulerPublishsync(DateTimeOffset delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + + string SchedulerPost(TimeSpan delay, TRequest request) where TRequest : class, IRequest; + string SchedulerPost(DateTimeOffset delay, TRequest request) where TRequest : class, IRequest; + Task SchedulerPostAsync(TimeSpan delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + Task SchedulerPostAsync(DateTimeOffset delay, TRequest request, bool continueOnCapturedContext = true, CancellationToken cancellationToken = default) where TRequest : class, IRequest; +} +``` + +Scheduling can be break into 2 part (Producer & Consumer): +- Producer -> Producing a message we are going to have a new interface: + +```c# +public interface IAmAMessageScheduler +{ +} + + +public interface IAmAMessageSchedulerAsync : IAmAMessageScheduler, IDisposable +{ + Task ScheduleAsync(DateTimeOffset at, SchedulerFireType fireType, TRequest request, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + Task ScheduleAsync(TimeSpan delay, SchedulerFireType fireType, TRequest request, CancellationToken cancellationToken = default) where TRequest : class, IRequest; + Task CancelSchedulerAsync(string id, CancellationToken cancellationToken = default); +} + + +public interface IAmAMessageSchedulerSync : IAmAMessageScheduler, IDisposable +{ + string Schedule(DateTimeOffset at, SchedulerFireType fireType, TRequest request) where TRequest : class, IRequest; + string Schedule(TimeSpan delay, SchedulerFireType fireType, TRequest request) where TRequest : class, IRequest; + void CancelScheduler(string id); +} +``` + +- Consumer -> To avoid duplication code we are going to introduce a new message and have a handler for that: + +```c# +public class SchedulerMessageFired : Event +{ + ..... +} + + +public class SchedulerMessageFiredHandlerAsync(IAmACommandProcessor processor) : RequestHandlerAsync +{ + .... +} +``` + +So on Scheduler implementation we need to send the SchedulerMessageFired + +```c# +public class JobExecute(IAmACommandProcessor processor) +{ + public async Task ExecuteAsync(Arg arg) + { + await processor.SendAsync(new SchedulerMessageFired{ ... }); + } +} +``` \ No newline at end of file From 42b904cc8619403b4d069752505aaec62802997b Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Mon, 20 Jan 2025 14:53:12 +0000 Subject: [PATCH 6/6] rollback unnecessary changes --- docker-compose-rmq.yaml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/docker-compose-rmq.yaml b/docker-compose-rmq.yaml index 5b2b5f43d0..a2846c8a1a 100644 --- a/docker-compose-rmq.yaml +++ b/docker-compose-rmq.yaml @@ -1,7 +1,16 @@ +version: '3' + services: rabbitmq: - image: masstransit/rabbitmq + image: brightercommand/rabbitmq:latest platform: linux/arm64 ports: - "5672:5672" - "15672:15672" + volumes: + - rabbitmq-home:/var/lib/rabbitmq + +volumes: + rabbitmq-home: + driver: local + \ No newline at end of file