diff --git a/Directory.Packages.props b/Directory.Packages.props index e249a871f..f4f67b8e5 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -9,12 +9,12 @@ - - + + - - - + + + @@ -23,8 +23,8 @@ - - + + diff --git a/examples/AI/ConversationalAI/ConversationalAI.csproj b/examples/AI/ConversationalAI/ConversationalAI.csproj index 976265a5c..182363d2e 100644 --- a/examples/AI/ConversationalAI/ConversationalAI.csproj +++ b/examples/AI/ConversationalAI/ConversationalAI.csproj @@ -1,7 +1,6 @@  - net8.0 enable enable diff --git a/examples/Actor/ActorClient/ActorClient.csproj b/examples/Actor/ActorClient/ActorClient.csproj index 0d1d94f55..43aa659b1 100644 --- a/examples/Actor/ActorClient/ActorClient.csproj +++ b/examples/Actor/ActorClient/ActorClient.csproj @@ -2,7 +2,6 @@ Exe - net6 diff --git a/examples/Actor/DemoActor/DemoActor.csproj b/examples/Actor/DemoActor/DemoActor.csproj index 24a42ee0e..4ed29d66e 100644 --- a/examples/Actor/DemoActor/DemoActor.csproj +++ b/examples/Actor/DemoActor/DemoActor.csproj @@ -1,9 +1,5 @@  - - net6 - - true true diff --git a/examples/Actor/IDemoActor/IDemoActor.csproj b/examples/Actor/IDemoActor/IDemoActor.csproj index 9f7744796..8b8a84b21 100644 --- a/examples/Actor/IDemoActor/IDemoActor.csproj +++ b/examples/Actor/IDemoActor/IDemoActor.csproj @@ -1,9 +1,5 @@  - - net6 - - diff --git a/examples/AspNetCore/ControllerSample/ControllerSample.csproj b/examples/AspNetCore/ControllerSample/ControllerSample.csproj index 6dbe750a6..061510f94 100644 --- a/examples/AspNetCore/ControllerSample/ControllerSample.csproj +++ b/examples/AspNetCore/ControllerSample/ControllerSample.csproj @@ -1,9 +1,5 @@  - - net6 - - diff --git a/examples/AspNetCore/GrpcServiceSample/GrpcServiceSample.csproj b/examples/AspNetCore/GrpcServiceSample/GrpcServiceSample.csproj index 2319f6a56..5e1a27edf 100644 --- a/examples/AspNetCore/GrpcServiceSample/GrpcServiceSample.csproj +++ b/examples/AspNetCore/GrpcServiceSample/GrpcServiceSample.csproj @@ -1,7 +1,6 @@ - net6 true diff --git a/examples/AspNetCore/RoutingSample/RoutingSample.csproj b/examples/AspNetCore/RoutingSample/RoutingSample.csproj index 6dbe750a6..061510f94 100644 --- a/examples/AspNetCore/RoutingSample/RoutingSample.csproj +++ b/examples/AspNetCore/RoutingSample/RoutingSample.csproj @@ -1,9 +1,5 @@  - - net6 - - diff --git a/examples/AspNetCore/SecretStoreConfigurationProviderSample/SecretStoreConfigurationProviderSample.csproj b/examples/AspNetCore/SecretStoreConfigurationProviderSample/SecretStoreConfigurationProviderSample.csproj index 01fbc2079..ce47841be 100644 --- a/examples/AspNetCore/SecretStoreConfigurationProviderSample/SecretStoreConfigurationProviderSample.csproj +++ b/examples/AspNetCore/SecretStoreConfigurationProviderSample/SecretStoreConfigurationProviderSample.csproj @@ -2,7 +2,6 @@ Exe - net6 diff --git a/examples/Client/ConfigurationApi/ConfigurationApi.csproj b/examples/Client/ConfigurationApi/ConfigurationApi.csproj index 761ebb38f..650a294c0 100644 --- a/examples/Client/ConfigurationApi/ConfigurationApi.csproj +++ b/examples/Client/ConfigurationApi/ConfigurationApi.csproj @@ -1,10 +1,5 @@ - - net6 - - - diff --git a/examples/Client/Cryptography/Cryptography.csproj b/examples/Client/Cryptography/Cryptography.csproj index 525c38562..1a1b1eeb4 100644 --- a/examples/Client/Cryptography/Cryptography.csproj +++ b/examples/Client/Cryptography/Cryptography.csproj @@ -2,7 +2,6 @@ Exe - net6.0 enable enable latest diff --git a/examples/Client/Cryptography/Examples/EncryptDecryptFileStreamExample.cs b/examples/Client/Cryptography/Examples/EncryptDecryptFileStreamExample.cs index 19df06345..89b55dfb8 100644 --- a/examples/Client/Cryptography/Examples/EncryptDecryptFileStreamExample.cs +++ b/examples/Client/Cryptography/Examples/EncryptDecryptFileStreamExample.cs @@ -37,9 +37,8 @@ public override async Task RunAsync(CancellationToken cancellationToken) await using var encryptFs = new FileStream(fileName, FileMode.Open); var bufferedEncryptedBytes = new ArrayBufferWriter(); - await foreach (var bytes in (await client.EncryptAsync(componentName, encryptFs, keyName, - new EncryptionOptions(KeyWrapAlgorithm.Rsa), cancellationToken)) - .WithCancellation(cancellationToken)) + await foreach (var bytes in (client.EncryptAsync(componentName, encryptFs, keyName, + new EncryptionOptions(KeyWrapAlgorithm.Rsa), cancellationToken))) { bufferedEncryptedBytes.Write(bytes.Span); } @@ -53,8 +52,8 @@ public override async Task RunAsync(CancellationToken cancellationToken) //We'll stream the decrypted bytes from a MemoryStream into the above temporary file await using var encryptedMs = new MemoryStream(bufferedEncryptedBytes.WrittenMemory.ToArray()); - await foreach (var result in (await client.DecryptAsync(componentName, encryptedMs, keyName, - cancellationToken)).WithCancellation(cancellationToken)) + await foreach (var result in (client.DecryptAsync(componentName, encryptedMs, keyName, + cancellationToken))) { decryptFs.Write(result.Span); } diff --git a/examples/Client/DistributedLock/DistributedLock.csproj b/examples/Client/DistributedLock/DistributedLock.csproj index 4c04fb907..45e0f42d6 100644 --- a/examples/Client/DistributedLock/DistributedLock.csproj +++ b/examples/Client/DistributedLock/DistributedLock.csproj @@ -6,9 +6,4 @@ - - net6 - - - diff --git a/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.csproj b/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.csproj index b1e7647c7..bccce505e 100644 --- a/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.csproj +++ b/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.csproj @@ -2,7 +2,6 @@ Exe - net6 Samples.Client enable diff --git a/examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.csproj b/examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.csproj index 52b77a3e5..93500fa1f 100644 --- a/examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.csproj +++ b/examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.csproj @@ -2,7 +2,6 @@ Exe - net6 Samples.Client enable diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj index 4ad620d00..77e84dbbf 100644 --- a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj @@ -2,7 +2,6 @@ Exe - net6.0 enable enable diff --git a/examples/Client/ServiceInvocation/ServiceInvocation.csproj b/examples/Client/ServiceInvocation/ServiceInvocation.csproj index 7b165835e..e292e4aeb 100644 --- a/examples/Client/ServiceInvocation/ServiceInvocation.csproj +++ b/examples/Client/ServiceInvocation/ServiceInvocation.csproj @@ -2,7 +2,6 @@ Exe - net6 Samples.Client enable diff --git a/examples/Client/StateManagement/StateManagement.csproj b/examples/Client/StateManagement/StateManagement.csproj index 7b165835e..e292e4aeb 100644 --- a/examples/Client/StateManagement/StateManagement.csproj +++ b/examples/Client/StateManagement/StateManagement.csproj @@ -2,7 +2,6 @@ Exe - net6 Samples.Client enable diff --git a/examples/Directory.Build.props b/examples/Directory.Build.props index 00ec8e6c6..bf030c210 100644 --- a/examples/Directory.Build.props +++ b/examples/Directory.Build.props @@ -1,10 +1,11 @@ - + - - - $(RepoRoot)bin\$(Configuration)\examples\$(MSBuildProjectName)\ + + net6;net7;net8;net9 + + $(RepoRoot)bin\$(Configuration)\examples\$(MSBuildProjectName)\ - false - + false + \ No newline at end of file diff --git a/examples/GeneratedActor/ActorClient/ActorClient.csproj b/examples/GeneratedActor/ActorClient/ActorClient.csproj index 88f75663d..a51495a9f 100644 --- a/examples/GeneratedActor/ActorClient/ActorClient.csproj +++ b/examples/GeneratedActor/ActorClient/ActorClient.csproj @@ -2,7 +2,6 @@ Exe - net6 10.0 enable enable diff --git a/examples/GeneratedActor/ActorCommon/ActorCommon.csproj b/examples/GeneratedActor/ActorCommon/ActorCommon.csproj index 2cbc61e2c..dbbf1c40f 100644 --- a/examples/GeneratedActor/ActorCommon/ActorCommon.csproj +++ b/examples/GeneratedActor/ActorCommon/ActorCommon.csproj @@ -1,7 +1,6 @@ - net6 10.0 enable enable diff --git a/examples/GeneratedActor/ActorService/ActorService.csproj b/examples/GeneratedActor/ActorService/ActorService.csproj index a74104363..e87bafc5e 100644 --- a/examples/GeneratedActor/ActorService/ActorService.csproj +++ b/examples/GeneratedActor/ActorService/ActorService.csproj @@ -1,7 +1,6 @@ - net6 10.0 enable enable diff --git a/examples/Jobs/JobsSample/JobsSample.csproj b/examples/Jobs/JobsSample/JobsSample.csproj index 4663d1d5b..642c299ef 100644 --- a/examples/Jobs/JobsSample/JobsSample.csproj +++ b/examples/Jobs/JobsSample/JobsSample.csproj @@ -1,7 +1,6 @@ - net8.0 enable enable diff --git a/examples/Workflow/WorkflowAsyncOperations/WorkflowAsyncOperations.csproj b/examples/Workflow/WorkflowAsyncOperations/WorkflowAsyncOperations.csproj index a1350fa79..be70a1b25 100644 --- a/examples/Workflow/WorkflowAsyncOperations/WorkflowAsyncOperations.csproj +++ b/examples/Workflow/WorkflowAsyncOperations/WorkflowAsyncOperations.csproj @@ -2,7 +2,6 @@ Exe - net6.0 enable enable diff --git a/examples/Workflow/WorkflowConsoleApp/WorkflowConsoleApp.csproj b/examples/Workflow/WorkflowConsoleApp/WorkflowConsoleApp.csproj index 25c03a419..9b38483c5 100644 --- a/examples/Workflow/WorkflowConsoleApp/WorkflowConsoleApp.csproj +++ b/examples/Workflow/WorkflowConsoleApp/WorkflowConsoleApp.csproj @@ -6,7 +6,6 @@ Exe - net6 enable 612,618 diff --git a/examples/Workflow/WorkflowExternalInteraction/WorkflowExternalInteraction.csproj b/examples/Workflow/WorkflowExternalInteraction/WorkflowExternalInteraction.csproj index 4aae25c46..59d76d46e 100644 --- a/examples/Workflow/WorkflowExternalInteraction/WorkflowExternalInteraction.csproj +++ b/examples/Workflow/WorkflowExternalInteraction/WorkflowExternalInteraction.csproj @@ -2,7 +2,6 @@ Exe - net6.0 enable enable diff --git a/examples/Workflow/WorkflowFanOutFanIn/WorkflowFanOutFanIn.csproj b/examples/Workflow/WorkflowFanOutFanIn/WorkflowFanOutFanIn.csproj index af3a1b2c8..ba92ee6ec 100644 --- a/examples/Workflow/WorkflowFanOutFanIn/WorkflowFanOutFanIn.csproj +++ b/examples/Workflow/WorkflowFanOutFanIn/WorkflowFanOutFanIn.csproj @@ -2,7 +2,6 @@ Exe - net8.0 enable enable diff --git a/examples/Workflow/WorkflowMonitor/WorkflowMonitor.csproj b/examples/Workflow/WorkflowMonitor/WorkflowMonitor.csproj index 91ded8afb..ba92ee6ec 100644 --- a/examples/Workflow/WorkflowMonitor/WorkflowMonitor.csproj +++ b/examples/Workflow/WorkflowMonitor/WorkflowMonitor.csproj @@ -2,7 +2,6 @@ Exe - net6.0 enable enable diff --git a/examples/Workflow/WorkflowSubworkflow/WorkflowSubworkflow.csproj b/examples/Workflow/WorkflowSubworkflow/WorkflowSubworkflow.csproj index af3a1b2c8..ba92ee6ec 100644 --- a/examples/Workflow/WorkflowSubworkflow/WorkflowSubworkflow.csproj +++ b/examples/Workflow/WorkflowSubworkflow/WorkflowSubworkflow.csproj @@ -2,7 +2,6 @@ Exe - net8.0 enable enable diff --git a/examples/Workflow/WorkflowTaskChaining/WorkflowTaskChaining.csproj b/examples/Workflow/WorkflowTaskChaining/WorkflowTaskChaining.csproj index 91ded8afb..ba92ee6ec 100644 --- a/examples/Workflow/WorkflowTaskChaining/WorkflowTaskChaining.csproj +++ b/examples/Workflow/WorkflowTaskChaining/WorkflowTaskChaining.csproj @@ -2,7 +2,6 @@ Exe - net6.0 enable enable diff --git a/examples/Workflow/WorkflowUnitTest/WorkflowUnitTest.csproj b/examples/Workflow/WorkflowUnitTest/WorkflowUnitTest.csproj index dec14a713..d0f391a25 100644 --- a/examples/Workflow/WorkflowUnitTest/WorkflowUnitTest.csproj +++ b/examples/Workflow/WorkflowUnitTest/WorkflowUnitTest.csproj @@ -1,7 +1,6 @@  - net6.0 enable false diff --git a/src/Dapr.AI/Dapr.AI.csproj b/src/Dapr.AI/Dapr.AI.csproj index 8220c5c4d..87ae8bb8c 100644 --- a/src/Dapr.AI/Dapr.AI.csproj +++ b/src/Dapr.AI/Dapr.AI.csproj @@ -1,7 +1,6 @@  - net6;net8 enable enable Dapr.AI diff --git a/src/Dapr.Actors/Extensions/DurationExtensions.cs b/src/Dapr.Actors/Extensions/DurationExtensions.cs index c3f1cafa6..dcd38583a 100644 --- a/src/Dapr.Actors/Extensions/DurationExtensions.cs +++ b/src/Dapr.Actors/Extensions/DurationExtensions.cs @@ -68,13 +68,12 @@ public static TimeSpan FromPrefixedPeriod(this string period) if (period.StartsWith(MonthlyPrefixPeriod)) { - var dateTime = DateTime.UtcNow; - return dateTime.AddMonths(1) - dateTime; + return TimeSpan.FromDays(30); } if (period.StartsWith(MidnightPrefixPeriod)) { - return new TimeSpan(); + return TimeSpan.Zero; } if (period.StartsWith(WeeklyPrefixPeriod)) diff --git a/src/Dapr.Actors/Runtime/ActorManager.cs b/src/Dapr.Actors/Runtime/ActorManager.cs index c78126ccd..5b0768749 100644 --- a/src/Dapr.Actors/Runtime/ActorManager.cs +++ b/src/Dapr.Actors/Runtime/ActorManager.cs @@ -223,7 +223,7 @@ async Task RequestFunc(Actor actor, CancellationToken ct) internal async Task FireTimerAsync(ActorId actorId, Stream requestBodyStream, CancellationToken cancellationToken = default) { #pragma warning disable 0618 - var timerData = await JsonSerializer.DeserializeAsync(requestBodyStream); + var timerData = await DeserializeAsync(requestBodyStream); #pragma warning restore 0618 // Create a Func to be invoked by common method. @@ -243,6 +243,62 @@ async Task RequestFunc(Actor actor, CancellationToken ct) await this.DispatchInternalAsync(actorId, this.timerMethodContext, RequestFunc, cancellationToken); } +#pragma warning disable 0618 + internal static async Task DeserializeAsync(Stream stream) + { + var json = await JsonSerializer.DeserializeAsync(stream); + if (json.ValueKind == JsonValueKind.Null) + { + return null; + } + + var setAnyProperties = false; // Used to determine if anything was actually deserialized + var dueTime = TimeSpan.Zero; + var callback = ""; + var period = TimeSpan.Zero; + var data = Array.Empty(); + TimeSpan? ttl = null; + if (json.TryGetProperty("callback", out var callbackProperty)) + { + setAnyProperties = true; + callback = callbackProperty.GetString(); + } + if (json.TryGetProperty("dueTime", out var dueTimeProperty)) + { + setAnyProperties = true; + var dueTimeString = dueTimeProperty.GetString(); + dueTime = ConverterUtils.ConvertTimeSpanFromDaprFormat(dueTimeString); + } + + if (json.TryGetProperty("period", out var periodProperty)) + { + setAnyProperties = true; + var periodString = periodProperty.GetString(); + (period, _) = ConverterUtils.ConvertTimeSpanValueFromISO8601Format(periodString); + } + + if (json.TryGetProperty("data", out var dataProperty) && dataProperty.ValueKind != JsonValueKind.Null) + { + setAnyProperties = true; + data = dataProperty.GetBytesFromBase64(); + } + + if (json.TryGetProperty("ttl", out var ttlProperty)) + { + setAnyProperties = true; + var ttlString = ttlProperty.GetString(); + ttl = ConverterUtils.ConvertTimeSpanFromDaprFormat(ttlString); + } + + if (!setAnyProperties) + { + return null; //No properties were ever deserialized, so return null instead of default values + } + + return new TimerInfo(callback, data, dueTime, period, ttl); + } +#pragma warning restore 0618 + internal async Task ActivateActorAsync(ActorId actorId) { // An actor is activated by "Dapr" runtime when a call is to be made for an actor. diff --git a/src/Dapr.Actors/Runtime/ConverterUtils.cs b/src/Dapr.Actors/Runtime/ConverterUtils.cs index 94cfd3d35..1705c9fe9 100644 --- a/src/Dapr.Actors/Runtime/ConverterUtils.cs +++ b/src/Dapr.Actors/Runtime/ConverterUtils.cs @@ -49,7 +49,7 @@ public static TimeSpan ConvertTimeSpanFromDaprFormat(string valueString) int msIndex = spanOfValue.IndexOf("ms"); // handle days from hours. - var hoursSpan = spanOfValue.Slice(0, hIndex); + var hoursSpan = spanOfValue[..hIndex]; var hours = int.Parse(hoursSpan); var days = hours / 24; hours %= 24; @@ -103,7 +103,7 @@ public static string ConvertTimeSpanValueInISO8601Format(TimeSpan value, int? re builder.Append($"{value.Days}D"); } - builder.Append("T"); + builder.Append('T'); if(value.Hours > 0) { diff --git a/src/Dapr.Client/Crypto/DecryptionStreamProcessor.cs b/src/Dapr.Client/Crypto/DecryptionStreamProcessor.cs new file mode 100644 index 000000000..4a7748b69 --- /dev/null +++ b/src/Dapr.Client/Crypto/DecryptionStreamProcessor.cs @@ -0,0 +1,153 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +#nullable enable +using System; +using System.Collections.Generic; +using System.IO; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously + +namespace Dapr.Client.Crypto; + +/// +/// Provides the implementation to decrypt a stream of plaintext data with the Dapr runtime. +/// +internal sealed class DecryptionStreamProcessor : IDisposable +{ + private bool disposed; + private readonly Channel> outputChannel = Channel.CreateUnbounded>(); + + /// + /// Surfaces any exceptions encountered while asynchronously processing the inbound and outbound streams. + /// + internal event EventHandler? OnException; + + /// + /// Sends the provided bytes in chunks to the sidecar for the encryption operation. + /// + /// The stream containing the bytes to decrypt. + /// The call to make to the sidecar to process the encryption operation. + /// The size, in bytes, of the streaming blocks. + /// The decryption options. + /// Token used to cancel the ongoing request. + public async Task ProcessStreamAsync( + Stream inputStream, + AsyncDuplexStreamingCall call, + int streamingBlockSizeInBytes, + Autogenerated.DecryptRequestOptions options, + CancellationToken cancellationToken) + { + //Read from the input stream and write to the gRPC call + _ = Task.Run(async () => + { + try + { + await using var bufferedStream = new BufferedStream(inputStream, streamingBlockSizeInBytes); + var buffer = new byte[streamingBlockSizeInBytes]; + int bytesRead; + ulong sequenceNumber = 0; + + while ((bytesRead = await bufferedStream.ReadAsync(buffer, cancellationToken)) > 0) + { + var request = new Autogenerated.DecryptRequest + { + Payload = new Autogenerated.StreamPayload + { + Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber + } + }; + + //Only include the options in the first message + if (sequenceNumber == 0) + { + request.Options = options; + } + + await call.RequestStream.WriteAsync(request, cancellationToken); + + //Increment the sequence number + sequenceNumber++; + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Expected cancellation exception + } + catch (Exception ex) + { + OnException?.Invoke(this, ex); + } + finally + { + await call.RequestStream.CompleteAsync(); + } + }, cancellationToken); + + //Start reading from the gRPC call and writing to the output channel + _ = Task.Run(async () => + { + try + { + await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken)) + { + await outputChannel.Writer.WriteAsync(response.Payload.Data.Memory, cancellationToken); + } + } + catch (Exception ex) + { + OnException?.Invoke(this, ex); + } + finally + { + outputChannel.Writer.Complete(); + } + }, cancellationToken); + } + + /// + /// Retrieves the processed bytes from the operation from the sidecar and + /// returns as an enumerable stream. + /// + public async IAsyncEnumerable> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken) + { + await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken)) + { + yield return data; + } + } + + public void Dispose() + { + Dispose(true); + } + + private void Dispose(bool disposing) + { + if (!disposed) + { + if (disposing) + { + outputChannel.Writer.TryComplete(); + } + + disposed = true; + } + } +} diff --git a/src/Dapr.Client/Crypto/EncryptionStreamProcessor.cs b/src/Dapr.Client/Crypto/EncryptionStreamProcessor.cs new file mode 100644 index 000000000..ae37e776b --- /dev/null +++ b/src/Dapr.Client/Crypto/EncryptionStreamProcessor.cs @@ -0,0 +1,154 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +#nullable enable +using System; +using System.Collections.Generic; +using System.IO; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously + +namespace Dapr.Client.Crypto; + +/// +/// Provides the implementation to encrypt a stream of plaintext data with the Dapr runtime. +/// +internal sealed class EncryptionStreamProcessor : IDisposable +{ + private bool disposed; + private readonly Channel> outputChannel = Channel.CreateUnbounded>(); + + /// + /// Surfaces any exceptions encountered while asynchronously processing the inbound and outbound streams. + /// + internal event EventHandler? OnException; + + /// + /// Sends the provided bytes in chunks to the sidecar for the encryption operation. + /// + /// The stream containing the bytes to encrypt. + /// The call to make to the sidecar to process the encryption operation. + /// The encryption options. + /// The size, in bytes, of the streaming blocks. + /// Token used to cancel the ongoing request. + public async Task ProcessStreamAsync( + Stream inputStream, + AsyncDuplexStreamingCall call, + Autogenerated.EncryptRequestOptions options, + int streamingBlockSizeInBytes, + CancellationToken cancellationToken) + { + //Read from the input stream and write to the gRPC call + _ = Task.Run(async () => + { + try + { + await using var bufferedStream = new BufferedStream(inputStream, streamingBlockSizeInBytes); + var buffer = new byte[streamingBlockSizeInBytes]; + int bytesRead; + ulong sequenceNumber = 0; + + while ((bytesRead = await bufferedStream.ReadAsync(buffer, cancellationToken)) > 0) + { + var request = new Autogenerated.EncryptRequest + { + Payload = new Autogenerated.StreamPayload + { + Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber + } + }; + + //Only include the options in the first message + if (sequenceNumber == 0) + { + request.Options = options; + } + + await call.RequestStream.WriteAsync(request, cancellationToken); + + //Increment the sequence number + sequenceNumber++; + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Expected cancellation exception + } + catch (Exception ex) + { + OnException?.Invoke(this, ex); + } + finally + { + await call.RequestStream.CompleteAsync(); + } + }, cancellationToken); + + //Start reading from the gRPC call and writing to the output channel + _ = Task.Run(async () => + { + try + { + await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken)) + { + await outputChannel.Writer.WriteAsync(response.Payload.Data.Memory, cancellationToken); + } + } + catch (Exception ex) + { + OnException?.Invoke(this, ex); + } + finally + { + outputChannel.Writer.Complete(); + } + }, cancellationToken); + } + + /// + /// Retrieves the processed bytes from the operation from the sidecar and + /// returns as an enumerable stream. + /// + public async IAsyncEnumerable> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken) + { + await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken)) + { + yield return data; + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool disposing) + { + if (!disposed) + { + if (disposing) + { + outputChannel.Writer.TryComplete(); + } + + disposed = true; + } + } +} diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index 6be31a648..913cf109c 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -1105,7 +1105,7 @@ public abstract Task> EncryptAsync(string vaultResourceName /// A that can be used to cancel the operation. /// An array of encrypted bytes. [Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public abstract Task>> EncryptAsync(string vaultResourceName, Stream plaintextStream, string keyName, + public abstract IAsyncEnumerable> EncryptAsync(string vaultResourceName, Stream plaintextStream, string keyName, EncryptionOptions encryptionOptions, CancellationToken cancellationToken = default); /// @@ -1144,7 +1144,7 @@ public abstract Task> DecryptAsync(string vaultResourceName /// An asynchronously enumerable array of decrypted bytes. [Obsolete( "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public abstract Task>> DecryptAsync(string vaultResourceName, Stream ciphertextStream, + public abstract IAsyncEnumerable> DecryptAsync(string vaultResourceName, Stream ciphertextStream, string keyName, DecryptionOptions options, CancellationToken cancellationToken = default); /// @@ -1157,7 +1157,7 @@ public abstract Task>> DecryptAsync(string /// An asynchronously enumerable array of decrypted bytes. [Obsolete( "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public abstract Task>> DecryptAsync(string vaultResourceName, Stream ciphertextStream, + public abstract IAsyncEnumerable> DecryptAsync(string vaultResourceName, Stream ciphertextStream, string keyName, CancellationToken cancellationToken = default); #endregion diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index 394b313e2..dc5bac6a9 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -11,10 +11,10 @@ // limitations under the License. // ------------------------------------------------------------------------ -using Dapr.Common.Extensions; namespace Dapr.Client; +using Crypto; using System; using System.Buffers; using System.Collections.Generic; @@ -23,7 +23,6 @@ namespace Dapr.Client; using System.Net.Http; using System.Net.Http.Json; using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -1665,18 +1664,17 @@ public override async Task UnsubscribeConfigur /// [Obsolete( - "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + "As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")] public override async Task> EncryptAsync(string vaultResourceName, ReadOnlyMemory plaintextBytes, string keyName, EncryptionOptions encryptionOptions, CancellationToken cancellationToken = default) { using var memoryStream = plaintextBytes.CreateMemoryStream(true); - var encryptionResult = - await EncryptAsync(vaultResourceName, memoryStream, keyName, encryptionOptions, cancellationToken); + var encryptionResult = EncryptAsync(vaultResourceName, memoryStream, keyName, encryptionOptions, cancellationToken); var bufferedResult = new ArrayBufferWriter(); - await foreach (var item in encryptionResult.WithCancellation(cancellationToken)) + await foreach (var item in encryptionResult) { bufferedResult.Write(item.Span); } @@ -1686,15 +1684,18 @@ public override async Task> EncryptAsync(string vaultResour /// [Obsolete( - "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public override async Task>> EncryptAsync(string vaultResourceName, + "As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")] + public override async IAsyncEnumerable> EncryptAsync(string vaultResourceName, Stream plaintextStream, - string keyName, EncryptionOptions encryptionOptions, CancellationToken cancellationToken = default) + string keyName, EncryptionOptions encryptionOptions, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName)); ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName)); ArgumentVerifier.ThrowIfNull(plaintextStream, nameof(plaintextStream)); ArgumentVerifier.ThrowIfNull(encryptionOptions, nameof(encryptionOptions)); + + EventHandler exceptionHandler = (_, ex) => throw ex; var shouldOmitDecryptionKeyName = string.IsNullOrWhiteSpace(encryptionOptions @@ -1717,185 +1718,91 @@ public override async Task>> EncryptAsync( } var options = CreateCallOptions(headers: null, cancellationToken); - var duplexStream = client.EncryptAlpha1(options); - - //Run both operations at the same time, but return the output of the streaming values coming from the operation - var receiveResult = Task.FromResult(RetrieveEncryptedStreamAsync(duplexStream, cancellationToken)); - return await Task.WhenAll( - //Stream the plaintext data to the sidecar in chunks - SendPlaintextStreamAsync(plaintextStream, encryptionOptions.StreamingBlockSizeInBytes, - duplexStream, encryptRequestOptions, cancellationToken), - //At the same time, retrieve the encrypted response from the sidecar - receiveResult).ContinueWith(_ => receiveResult.Result, cancellationToken); - } + var duplexStream = Client.EncryptAlpha1(options); - /// - /// Sends the plaintext bytes in chunks to the sidecar to be encrypted. - /// - private async Task SendPlaintextStreamAsync(Stream plaintextStream, - int streamingBlockSizeInBytes, - AsyncDuplexStreamingCall duplexStream, - Autogenerated.EncryptRequestOptions encryptRequestOptions, - CancellationToken cancellationToken) - { - //Start with passing the metadata about the encryption request itself in the first message - await duplexStream.RequestStream.WriteAsync( - new Autogenerated.EncryptRequest { Options = encryptRequestOptions }, cancellationToken); - - //Send the plaintext bytes in blocks in subsequent messages - await using (var bufferedStream = new BufferedStream(plaintextStream, streamingBlockSizeInBytes)) + using var streamProcessor = new EncryptionStreamProcessor(); + try { - var buffer = new byte[streamingBlockSizeInBytes]; - int bytesRead; - ulong sequenceNumber = 0; + streamProcessor.OnException += exceptionHandler; + await streamProcessor.ProcessStreamAsync(plaintextStream, duplexStream, encryptRequestOptions, + encryptionOptions.StreamingBlockSizeInBytes, + cancellationToken); - while ((bytesRead = - await bufferedStream.ReadAsync(buffer.AsMemory(0, streamingBlockSizeInBytes), - cancellationToken)) != - 0) + await foreach (var value in streamProcessor.GetProcessedDataAsync(cancellationToken)) { - await duplexStream.RequestStream.WriteAsync( - new Autogenerated.EncryptRequest - { - Payload = new Autogenerated.StreamPayload - { - Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber - } - }, cancellationToken); - - //Increment the sequence number - sequenceNumber++; + yield return value; } } - - //Send the completion message - await duplexStream.RequestStream.CompleteAsync(); - } - - /// - /// Retrieves the encrypted bytes from the encryption operation on the sidecar and returns as an enumerable stream. - /// - private async IAsyncEnumerable> RetrieveEncryptedStreamAsync( - AsyncDuplexStreamingCall duplexStream, - [EnumeratorCancellation] CancellationToken cancellationToken) - { - await foreach (var encryptResponse in duplexStream.ResponseStream.ReadAllAsync(cancellationToken) - .ConfigureAwait(false)) + finally { - yield return encryptResponse.Payload.Data.Memory; + streamProcessor.OnException -= exceptionHandler; } } /// [Obsolete( - "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public override async Task>> DecryptAsync(string vaultResourceName, + "As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")] + public override async IAsyncEnumerable> DecryptAsync(string vaultResourceName, Stream ciphertextStream, string keyName, - DecryptionOptions decryptionOptions, CancellationToken cancellationToken = default) + DecryptionOptions decryptionOptions, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName)); ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName)); ArgumentVerifier.ThrowIfNull(ciphertextStream, nameof(ciphertextStream)); - ArgumentVerifier.ThrowIfNull(decryptionOptions, nameof(decryptionOptions)); + decryptionOptions ??= new DecryptionOptions(); + + EventHandler exceptionHandler = (_, ex) => throw ex; var decryptRequestOptions = new Autogenerated.DecryptRequestOptions { - ComponentName = vaultResourceName, KeyName = keyName + ComponentName = vaultResourceName, + KeyName = keyName }; var options = CreateCallOptions(headers: null, cancellationToken); var duplexStream = client.DecryptAlpha1(options); - //Run both operations at the same time, but return the output of the streaming values coming from the operation - var receiveResult = Task.FromResult(RetrieveDecryptedStreamAsync(duplexStream, cancellationToken)); - return await Task.WhenAll( - //Stream the ciphertext data to the sidecar in chunks - SendCiphertextStreamAsync(ciphertextStream, decryptionOptions.StreamingBlockSizeInBytes, - duplexStream, decryptRequestOptions, cancellationToken), - //At the same time, retrieve the decrypted response from the sidecar - receiveResult) - //Return only the result of the `RetrieveEncryptedStreamAsync` method - .ContinueWith(t => receiveResult.Result, cancellationToken); - } - - /// - [Obsolete( - "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public override Task>> DecryptAsync(string vaultResourceName, - Stream ciphertextStream, string keyName, CancellationToken cancellationToken = default) => - DecryptAsync(vaultResourceName, ciphertextStream, keyName, new DecryptionOptions(), - cancellationToken); - - /// - /// Sends the ciphertext bytes in chunks to the sidecar to be decrypted. - /// - private async Task SendCiphertextStreamAsync(Stream ciphertextStream, - int streamingBlockSizeInBytes, - AsyncDuplexStreamingCall duplexStream, - Autogenerated.DecryptRequestOptions decryptRequestOptions, - CancellationToken cancellationToken) - { - //Start with passing the metadata about the decryption request itself in the first message - await duplexStream.RequestStream.WriteAsync( - new Autogenerated.DecryptRequest { Options = decryptRequestOptions }, cancellationToken); - - //Send the ciphertext bytes in blocks in subsequent messages - await using (var bufferedStream = new BufferedStream(ciphertextStream, streamingBlockSizeInBytes)) + using var streamProcessor = new DecryptionStreamProcessor(); + try { - var buffer = new byte[streamingBlockSizeInBytes]; - int bytesRead; - ulong sequenceNumber = 0; + streamProcessor.OnException += exceptionHandler; + await streamProcessor.ProcessStreamAsync(ciphertextStream, duplexStream, decryptionOptions.StreamingBlockSizeInBytes, + decryptRequestOptions, + cancellationToken); - while ((bytesRead = - await bufferedStream.ReadAsync(buffer.AsMemory(0, streamingBlockSizeInBytes), - cancellationToken)) != 0) + await foreach (var value in streamProcessor.GetProcessedDataAsync(cancellationToken)) { - await duplexStream.RequestStream.WriteAsync( - new Autogenerated.DecryptRequest - { - Payload = new Autogenerated.StreamPayload - { - Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber - } - }, cancellationToken); - - //Increment the sequence number - sequenceNumber++; + yield return value; } } - - //Send the completion message - await duplexStream.RequestStream.CompleteAsync(); - } - - /// - /// Retrieves the decrypted bytes from the decryption operation on the sidecar and returns as an enumerable stream. - /// - private async IAsyncEnumerable> RetrieveDecryptedStreamAsync( - AsyncDuplexStreamingCall duplexStream, - [EnumeratorCancellation] CancellationToken cancellationToken) - { - await foreach (var decryptResponse in duplexStream.ResponseStream.ReadAllAsync(cancellationToken) - .ConfigureAwait(false)) + finally { - yield return decryptResponse.Payload.Data.Memory; + streamProcessor.OnException -= exceptionHandler; } } /// [Obsolete( - "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + "As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")] + public override IAsyncEnumerable> DecryptAsync(string vaultResourceName, + Stream ciphertextStream, string keyName, CancellationToken cancellationToken = default) => + DecryptAsync(vaultResourceName, ciphertextStream, keyName, new DecryptionOptions(), + cancellationToken); + + /// + [Obsolete( + "As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")] public override async Task> DecryptAsync(string vaultResourceName, ReadOnlyMemory ciphertextBytes, string keyName, DecryptionOptions decryptionOptions, CancellationToken cancellationToken = default) { using var memoryStream = ciphertextBytes.CreateMemoryStream(true); - var decryptionResult = - await DecryptAsync(vaultResourceName, memoryStream, keyName, decryptionOptions, cancellationToken); + var decryptionResult = DecryptAsync(vaultResourceName, memoryStream, keyName, decryptionOptions, cancellationToken); var bufferedResult = new ArrayBufferWriter(); - await foreach (var item in decryptionResult.WithCancellation(cancellationToken)) + await foreach (var item in decryptionResult) { bufferedResult.Write(item.Span); } @@ -1905,7 +1812,7 @@ public override async Task> DecryptAsync(string vaultResour /// [Obsolete( - "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + "As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")] public override async Task> DecryptAsync(string vaultResourceName, ReadOnlyMemory ciphertextBytes, string keyName, CancellationToken cancellationToken = default) => await DecryptAsync(vaultResourceName, ciphertextBytes, keyName, diff --git a/src/Dapr.Common/Dapr.Common.csproj b/src/Dapr.Common/Dapr.Common.csproj index dd34d844b..dac090d3e 100644 --- a/src/Dapr.Common/Dapr.Common.csproj +++ b/src/Dapr.Common/Dapr.Common.csproj @@ -1,7 +1,6 @@  - net6;net7;net8;net9 enable enable diff --git a/src/Dapr.Jobs/DaprJobsGrpcClient.cs b/src/Dapr.Jobs/DaprJobsGrpcClient.cs index 8743aa350..81dc570e3 100644 --- a/src/Dapr.Jobs/DaprJobsGrpcClient.cs +++ b/src/Dapr.Jobs/DaprJobsGrpcClient.cs @@ -163,17 +163,7 @@ public override async Task GetJobAsync(string jobName, Cancellat var envelope = new Autogenerated.GetJobRequest { Name = jobName }; var grpcCallOptions = DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprJobsClient).Assembly, this.DaprApiToken, cancellationToken); var response = await Client.GetJobAlpha1Async(envelope, grpcCallOptions); - var schedule = DateTime.TryParse(response.Job.DueTime, out var dueTime) - ? DaprJobSchedule.FromDateTime(dueTime) - : new DaprJobSchedule(response.Job.Schedule); - - return new DaprJobDetails(schedule) - { - DueTime = !string.IsNullOrWhiteSpace(response.Job.DueTime) ? DateTime.Parse(response.Job.DueTime) : null, - Ttl = !string.IsNullOrWhiteSpace(response.Job.Ttl) ? DateTime.Parse(response.Job.Ttl) : null, - RepeatCount = (int?)response.Job.Repeats, - Payload = response.Job.Data.ToByteArray() - }; + return DeserializeJobResponse(response); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { @@ -192,6 +182,29 @@ public override async Task GetJobAsync(string jobName, Cancellat throw new DaprException("Get job operation failed: the Dapr endpoint did not return the expected value."); } + /// + /// Testable method for performing job response deserialization. + /// + /// + /// This is exposed strictly for testing purposes. + /// + /// The job response to deserialize. + /// The deserialized job response. + internal static DaprJobDetails DeserializeJobResponse(Autogenerated.GetJobResponse response) + { + var schedule = DateTime.TryParse(response.Job.DueTime, out var dueTime) + ? DaprJobSchedule.FromDateTime(dueTime) + : new DaprJobSchedule(response.Job.Schedule); + + return new DaprJobDetails(schedule) + { + DueTime = !string.IsNullOrWhiteSpace(response.Job.DueTime) ? DateTime.Parse(response.Job.DueTime) : null, + Ttl = !string.IsNullOrWhiteSpace(response.Job.Ttl) ? DateTime.Parse(response.Job.Ttl) : null, + RepeatCount = (int?)response.Job.Repeats ?? 0, + Payload = response.Job.Data?.ToByteArray() ?? null + }; + } + /// /// Deletes the specified job. /// diff --git a/src/Dapr.Protos/Dapr.Protos.csproj b/src/Dapr.Protos/Dapr.Protos.csproj index 5331f229c..8a8804b22 100644 --- a/src/Dapr.Protos/Dapr.Protos.csproj +++ b/src/Dapr.Protos/Dapr.Protos.csproj @@ -1,7 +1,6 @@  - net6;net7;net8;net9 enable enable This package contains the reference protos used by develop services using Dapr. diff --git a/src/Dapr.Workflow/Dapr.Workflow.csproj b/src/Dapr.Workflow/Dapr.Workflow.csproj index f24d41e40..360d121ef 100644 --- a/src/Dapr.Workflow/Dapr.Workflow.csproj +++ b/src/Dapr.Workflow/Dapr.Workflow.csproj @@ -3,7 +3,6 @@ - net6;net7;net8;net9 enable Dapr.Workflow Dapr Workflow Authoring SDK diff --git a/src/Directory.Build.props b/src/Directory.Build.props index a74833a37..7c3c2f20e 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -3,7 +3,7 @@ - net6;net8;net9 + net6;net7;net8;net9 $(RepoRoot)bin\$(Configuration)\prod\$(MSBuildProjectName)\ $(OutputPath)$(MSBuildProjectName).xml diff --git a/test/Dapr.Actors.Test/Runtime/ActorManagerTests.cs b/test/Dapr.Actors.Test/Runtime/ActorManagerTests.cs index b27e9afe3..82ec4189e 100644 --- a/test/Dapr.Actors.Test/Runtime/ActorManagerTests.cs +++ b/test/Dapr.Actors.Test/Runtime/ActorManagerTests.cs @@ -12,6 +12,8 @@ // ------------------------------------------------------------------------ using System; +using System.IO; +using System.Text; using System.Threading; using System.Threading.Tasks; using Dapr.Actors.Client; @@ -175,6 +177,160 @@ await Assert.ThrowsAsync(async () => Assert.Equal(1, activator.DeleteCallCount); } + [Fact] + public async Task DeserializeTimer_Period_Iso8601_Time() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"0h0m7s10ms\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.FromSeconds(7).Add(TimeSpan.FromMilliseconds(10)), result.Period); + } + + [Fact] + public async Task DeserializeTimer_Period_DaprFormat_Every() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@every 15s\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.FromSeconds(15), result.Period); + } + + [Fact] + public async Task DeserializeTimer_Period_DaprFormat_Every2() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@every 3h2m15s\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.FromHours(3).Add(TimeSpan.FromMinutes(2)).Add(TimeSpan.FromSeconds(15)), result.Period); + } + + [Fact] + public async Task DeserializeTimer_Period_DaprFormat_Monthly() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@monthly\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.FromDays(30), result.Period); + } + + [Fact] + public async Task DeserializeTimer_Period_DaprFormat_Weekly() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@weekly\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.FromDays(7), result.Period); + } + + [Fact] + public async Task DeserializeTimer_Period_DaprFormat_Daily() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@daily\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.FromDays(1), result.Period); + } + + [Fact] + public async Task DeserializeTimer_Period_DaprFormat_Hourly() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@hourly\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.FromHours(1), result.Period); + } + + [Fact] + public async Task DeserializeTimer_DueTime_DaprFormat_Hourly() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"dueTime\": \"@hourly\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.FromHours(1), result.DueTime); + Assert.Equal(TimeSpan.Zero, result.Period); + } + + [Fact] + public async Task DeserializeTimer_DueTime_Iso8601Times() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"dueTime\": \"0h0m7s10ms\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Null(result.Ttl); + Assert.Equal(TimeSpan.Zero, result.Period); + Assert.Equal(TimeSpan.FromSeconds(7).Add(TimeSpan.FromMilliseconds(10)), result.DueTime); + } + + [Fact] + public async Task DeserializeTimer_Ttl_DaprFormat_Hourly() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"ttl\": \"@hourly\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.Zero, result.Period); + Assert.Equal(TimeSpan.FromHours(1), result.Ttl); + } + + [Fact] + public async Task DeserializeTimer_Ttl_Iso8601Times() + { + const string timerJson = "{\"callback\": \"TimerCallback\", \"ttl\": \"0h0m7s10ms\"}"; + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson)); + var result = await ActorManager.DeserializeAsync(stream); + + Assert.Equal("TimerCallback", result.Callback); + Assert.Equal(Array.Empty(), result.Data); + Assert.Equal(TimeSpan.Zero, result.DueTime); + Assert.Equal(TimeSpan.Zero, result.Period); + Assert.Equal(TimeSpan.FromSeconds(7).Add(TimeSpan.FromMilliseconds(10)), result.Ttl); + } + private interface ITestActor : IActor { } private class TestActor : Actor, ITestActor, IDisposable diff --git a/test/Dapr.Client.Test/CryptographyApiTest.cs b/test/Dapr.Client.Test/CryptographyApiTest.cs index a7d57a096..f74362b1e 100644 --- a/test/Dapr.Client.Test/CryptographyApiTest.cs +++ b/test/Dapr.Client.Test/CryptographyApiTest.cs @@ -1,5 +1,4 @@ using System; -using System.IO; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -30,28 +29,6 @@ await Assert.ThrowsAsync(async () => await client.EncryptAsyn (ReadOnlyMemory) Array.Empty(), keyName, new EncryptionOptions(KeyWrapAlgorithm.Rsa), CancellationToken.None)); } - [Fact] - public async Task EncryptAsync_Stream_VaultResourceName_ArgumentVerifierException() - { - var client = new DaprClientBuilder().Build(); - const string vaultResourceName = ""; - //Get response and validate - await Assert.ThrowsAsync(async () => await client.EncryptAsync(vaultResourceName, - new MemoryStream(), "MyKey", new EncryptionOptions(KeyWrapAlgorithm.Rsa), - CancellationToken.None)); - } - - [Fact] - public async Task EncryptAsync_Stream_KeyName_ArgumentVerifierException() - { - var client = new DaprClientBuilder().Build(); - const string keyName = ""; - //Get response and validate - await Assert.ThrowsAsync(async () => await client.EncryptAsync("myVault", - (Stream) new MemoryStream(), keyName, new EncryptionOptions(KeyWrapAlgorithm.Rsa), - CancellationToken.None)); - } - [Fact] public async Task DecryptAsync_ByteArray_VaultResourceName_ArgumentVerifierException() { @@ -71,25 +48,5 @@ public async Task DecryptAsync_ByteArray_KeyName_ArgumentVerifierException() await Assert.ThrowsAsync(async () => await client.DecryptAsync("myVault", Array.Empty(), keyName, new DecryptionOptions(), CancellationToken.None)); } - - [Fact] - public async Task DecryptAsync_Stream_VaultResourceName_ArgumentVerifierException() - { - var client = new DaprClientBuilder().Build(); - const string vaultResourceName = ""; - //Get response and validate - await Assert.ThrowsAsync(async () => await client.DecryptAsync(vaultResourceName, - new MemoryStream(), "MyKey", new DecryptionOptions(), CancellationToken.None)); - } - - [Fact] - public async Task DecryptAsync_Stream_KeyName_ArgumentVerifierException() - { - var client = new DaprClientBuilder().Build(); - const string keyName = ""; - //Get response and validate - await Assert.ThrowsAsync(async () => await client.DecryptAsync("myVault", - new MemoryStream(), keyName, new DecryptionOptions(), CancellationToken.None)); - } } } diff --git a/test/Dapr.Jobs.Test/DaprJobsGrpcClientTests.cs b/test/Dapr.Jobs.Test/DaprJobsGrpcClientTests.cs index 4f6168830..428b761c9 100644 --- a/test/Dapr.Jobs.Test/DaprJobsGrpcClientTests.cs +++ b/test/Dapr.Jobs.Test/DaprJobsGrpcClientTests.cs @@ -13,6 +13,7 @@ using System; using System.Net.Http; +using Dapr.Client.Autogen.Grpc.v1; using Dapr.Jobs.Models; using Moq; using Xunit; @@ -167,6 +168,21 @@ public void DeleteJobAsync_NameCannotBeEmpty() }); #pragma warning restore CS0618 // Type or member is obsolete } + + [Fact] + public void ShouldDeserialize_EveryExpression() + { + const string scheduleText = "@every 1m"; + var response = new GetJobResponse { Job = new Job { Name = "test", Schedule = scheduleText } }; + var schedule = DaprJobSchedule.FromExpression(scheduleText); + + var jobDetails = DaprJobsGrpcClient.DeserializeJobResponse(response); + Assert.Null(jobDetails.Payload); + Assert.Equal(0, jobDetails.RepeatCount); + Assert.Null(jobDetails.Ttl); + Assert.Null(jobDetails.DueTime); + Assert.Equal(jobDetails.Schedule.ExpressionValue, schedule.ExpressionValue); + } private sealed record TestPayload(string Name, string Color); }