Skip to content

WIP: Add gRPC keep alive #1543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: release-1.16
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 75 additions & 4 deletions src/Dapr.Client/DaprClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ public DaprClientBuilder()
this.GrpcEndpoint = DaprDefaults.GetDefaultGrpcEndpoint();
this.HttpEndpoint = DaprDefaults.GetDefaultHttpEndpoint();

this.GrpcChannelOptions = new GrpcChannelOptions()
{
this.GrpcKeepAliveEnabled = DaprDefaults.GetDefaultGrpcKeepAliveEnable();
this.GrpcKeepAliveTime = TimeSpan.FromSeconds(DaprDefaults.GetDefaultGrpcKeepAliveTimeSeconds());
this.GrpcKeepAliveTimeout = TimeSpan.FromSeconds(DaprDefaults.GetDefaultGrpcKeepAliveTimeoutSeconds());
this.GrpcKeepAlivePermitWithoutCalls = DaprDefaults.GetDefaultGrpcKeepAliveWithoutCalls();

this.GrpcChannelOptions = new GrpcChannelOptions
{
// The gRPC client doesn't throw the right exception for cancellation
// by default, this switches that behavior on.
ThrowOperationCanceledOnCancellation = true,
Expand All @@ -57,7 +62,12 @@ public DaprClientBuilder()
// property exposed for testing purposes
internal GrpcChannelOptions GrpcChannelOptions { get; private set; }
internal string DaprApiToken { get; private set; }
internal TimeSpan Timeout { get; private set; }
internal TimeSpan Timeout { get; private set; }

internal bool GrpcKeepAliveEnabled { get; private set; }
internal TimeSpan GrpcKeepAliveTime { get; private set; }
internal TimeSpan GrpcKeepAliveTimeout { get; private set; }
internal bool GrpcKeepAlivePermitWithoutCalls { get; private set; }

/// <summary>
/// Overrides the HTTP endpoint used by <see cref="DaprClient" /> for communicating with the Dapr runtime.
Expand Down Expand Up @@ -148,6 +158,50 @@ public DaprClientBuilder UseTimeout(TimeSpan timeout)
return this;
}

/// <summary>
/// Enables or disables gRPC keep-alive.
/// </summary>
/// <param name="enabled">Whether to enable gRPC keep-alive.</param>
/// <returns>The <see cref="DaprClientBuilder" /> instance.</returns>
public DaprClientBuilder UseGrpcKeepAlive(bool enabled)
{
this.GrpcKeepAliveEnabled = enabled;
return this;
}

/// <summary>
/// Sets the gRPC keep-alive time interval.
/// </summary>
/// <param name="keepAliveTime">The time interval between keep-alive pings.</param>
/// <returns>The <see cref="DaprClientBuilder" /> instance.</returns>
public DaprClientBuilder UseGrpcKeepAliveTime(TimeSpan keepAliveTime)
{
this.GrpcKeepAliveTime = keepAliveTime;
return this;
}

/// <summary>
/// Sets the gRPC keep-alive timeout.
/// </summary>
/// <param name="keepAliveTimeout">The time to wait for a keep-alive ping response before considering the connection dead.</param>
/// <returns>The <see cref="DaprClientBuilder" /> instance.</returns>
public DaprClientBuilder UseGrpcKeepAliveTimeout(TimeSpan keepAliveTimeout)
{
this.GrpcKeepAliveTimeout = keepAliveTimeout;
return this;
}

/// <summary>
/// Sets whether gRPC keep-alive should be sent when there are no active calls.
/// </summary>
/// <param name="permitWithoutCalls">Whether to send keep-alive pings even when there are no active calls.</param>
/// <returns>The <see cref="DaprClientBuilder" /> instance.</returns>
public DaprClientBuilder UseGrpcKeepAlivePermitWithoutCalls(bool permitWithoutCalls)
{
this.GrpcKeepAlivePermitWithoutCalls = permitWithoutCalls;
return this;
}

/// <summary>
/// Builds a <see cref="DaprClient" /> instance from the properties of the builder.
/// </summary>
Expand All @@ -172,13 +226,30 @@ public DaprClient Build()
throw new InvalidOperationException("The HTTP endpoint must use http or https.");
}

if (this.GrpcKeepAliveEnabled)
{
if (!(this.GrpcChannelOptions.HttpHandler is SocketsHttpHandler))
{
var handler = new SocketsHttpHandler();
this.GrpcChannelOptions.HttpHandler = handler;
}

var socketsHandler = (SocketsHttpHandler)this.GrpcChannelOptions.HttpHandler;

socketsHandler.KeepAlivePingDelay = this.GrpcKeepAliveTime;
socketsHandler.KeepAlivePingTimeout = this.GrpcKeepAliveTimeout;
socketsHandler.KeepAlivePingPolicy = this.GrpcKeepAlivePermitWithoutCalls
? HttpKeepAlivePingPolicy.Always
: HttpKeepAlivePingPolicy.WithActiveRequests;
}

var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions);
var client = new Autogenerated.Dapr.DaprClient(channel);


var apiTokenHeader = DaprClient.GetDaprApiTokenHeader(this.DaprApiToken);
var httpClient = HttpClientFactory is object ? HttpClientFactory() : new HttpClient();

if (this.Timeout > TimeSpan.Zero)
{
httpClient.Timeout = this.Timeout;
Expand Down
52 changes: 52 additions & 0 deletions src/Dapr.Common/DaprDefaults.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,19 @@ internal static class DaprDefaults
public const string DaprHttpPortName = "DAPR_HTTP_PORT";
public const string DaprGrpcEndpointName = "DAPR_GRPC_ENDPOINT";
public const string DaprGrpcPortName = "DAPR_GRPC_PORT";
public const string DaprGrpcKeepAliveEnableName = "DAPR_ENABLE_KEEP_ALIVE";
public const string DaprGrpcKeepAliveTimeName = "DAPR_KEEP_ALIVE_TIME";
public const string DaprGrpcKeepAliveTimeoutName = "DAPR_KEEP_ALIVE_TIMEOUT";
public const string DaprGrpcKeepAliveWithoutCallsName = "DAPR_KEEP_ALIVE_WITHOUT_CALLS";

public const string DefaultDaprScheme = "http";
public const string DefaultDaprHost = "localhost";
public const int DefaultHttpPort = 3500;
public const int DefaultGrpcPort = 50001;
public const bool DefaultGrpcKeepAliveEnable = false;
public const int DefaultGrpcKeepAliveTimeSeconds = 60;
public const int DefaultGrpcKeepAliveTimeoutSeconds = 20;
public const bool DefaultGrpcKeepAliveWithoutCalls = true;

/// <summary>
/// Get the value of environment variable DAPR_API_TOKEN
Expand Down Expand Up @@ -130,4 +138,48 @@ private static string BuildEndpoint(string? endpoint, int endpointPort)
//Fall back to the environment variable with the same name or default to an empty string
return Environment.GetEnvironmentVariable(name);
}

/// <summary>
/// Get whether gRPC keep-alive is enabled based on environment variables.
/// </summary>
/// <param name="configuration">The optional <see cref="IConfiguration"/> to pull the value from.</param>
/// <returns>A boolean indicating whether gRPC keep-alive is enabled.</returns>
public static bool GetDefaultGrpcKeepAliveEnable(IConfiguration? configuration = null)
{
var value = GetResourceValue(configuration, DaprGrpcKeepAliveEnableName);
return string.IsNullOrWhiteSpace(value) ? DefaultGrpcKeepAliveEnable : bool.Parse(value);
}

/// <summary>
/// Get the gRPC keep-alive time in seconds based on environment variables.
/// </summary>
/// <param name="configuration">The optional <see cref="IConfiguration"/> to pull the value from.</param>
/// <returns>The gRPC keep-alive time in seconds.</returns>
public static int GetDefaultGrpcKeepAliveTimeSeconds(IConfiguration? configuration = null)
{
var value = GetResourceValue(configuration, DaprGrpcKeepAliveTimeName);
return string.IsNullOrWhiteSpace(value) ? DefaultGrpcKeepAliveTimeSeconds : int.Parse(value);
}

/// <summary>
/// Get the gRPC keep-alive timeout in seconds based on environment variables.
/// </summary>
/// <param name="configuration">The optional <see cref="IConfiguration"/> to pull the value from.</param>
/// <returns>The gRPC keep-alive timeout in seconds.</returns>
public static int GetDefaultGrpcKeepAliveTimeoutSeconds(IConfiguration? configuration = null)
{
var value = GetResourceValue(configuration, DaprGrpcKeepAliveTimeoutName);
return string.IsNullOrWhiteSpace(value) ? DefaultGrpcKeepAliveTimeoutSeconds : int.Parse(value);
}

/// <summary>
/// Get whether gRPC keep-alive should be sent without calls based on environment variables.
/// </summary>
/// <param name="configuration">The optional <see cref="IConfiguration"/> to pull the value from.</param>
/// <returns>A boolean indicating whether gRPC keep-alive should be sent without calls.</returns>
public static bool GetDefaultGrpcKeepAliveWithoutCalls(IConfiguration? configuration = null)
{
var value = GetResourceValue(configuration, DaprGrpcKeepAliveWithoutCallsName);
return string.IsNullOrWhiteSpace(value) ? DefaultGrpcKeepAliveWithoutCalls : bool.Parse(value);
}
}
99 changes: 94 additions & 5 deletions src/Dapr.Common/DaprGenericClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System;
using System.Reflection;
using System.Text.Json;
using System.Threading;
using Grpc.Net.Client;
using Microsoft.Extensions.Configuration;

Expand All @@ -31,6 +34,11 @@ protected DaprGenericClientBuilder(IConfiguration? configuration = null)
this.GrpcEndpoint = DaprDefaults.GetDefaultGrpcEndpoint();
this.HttpEndpoint = DaprDefaults.GetDefaultHttpEndpoint();

this.GrpcKeepAliveEnabled = DaprDefaults.GetDefaultGrpcKeepAliveEnable(configuration);
this.GrpcKeepAliveTime = TimeSpan.FromSeconds(DaprDefaults.GetDefaultGrpcKeepAliveTimeSeconds(configuration));
this.GrpcKeepAliveTimeout = TimeSpan.FromSeconds(DaprDefaults.GetDefaultGrpcKeepAliveTimeoutSeconds(configuration));
this.GrpcKeepAlivePermitWithoutCalls = DaprDefaults.GetDefaultGrpcKeepAliveWithoutCalls(configuration);

this.GrpcChannelOptions = new GrpcChannelOptions()
{
// The gRPC client doesn't throw the right exception for cancellation
Expand Down Expand Up @@ -71,12 +79,32 @@ protected DaprGenericClientBuilder(IConfiguration? configuration = null)
/// Property exposed for testing purposes.
/// </summary>
public string DaprApiToken { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal TimeSpan Timeout { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal bool GrpcKeepAliveEnabled { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal TimeSpan GrpcKeepAliveTime { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal TimeSpan GrpcKeepAliveTimeout { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal bool GrpcKeepAlivePermitWithoutCalls { get; private set; }

/// <summary>
/// Overrides the HTTP endpoint used by the Dapr client for communicating with the Dapr runtime.
/// </summary>
Expand Down Expand Up @@ -180,6 +208,50 @@ public DaprGenericClientBuilder<TClientBuilder> UseTimeout(TimeSpan timeout)
return this;
}

/// <summary>
/// Enables or disables gRPC keep-alive.
/// </summary>
/// <param name="enabled">Whether to enable gRPC keep-alive.</param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseGrpcKeepAlive(bool enabled)
{
this.GrpcKeepAliveEnabled = enabled;
return this;
}

/// <summary>
/// Sets the gRPC keep-alive time interval.
/// </summary>
/// <param name="keepAliveTime">The time interval between keep-alive pings.</param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseGrpcKeepAliveTime(TimeSpan keepAliveTime)
{
this.GrpcKeepAliveTime = keepAliveTime;
return this;
}

/// <summary>
/// Sets the gRPC keep-alive timeout.
/// </summary>
/// <param name="keepAliveTimeout">The time to wait for a keep-alive ping response before considering the connection dead.</param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseGrpcKeepAliveTimeout(TimeSpan keepAliveTimeout)
{
this.GrpcKeepAliveTimeout = keepAliveTimeout;
return this;
}

/// <summary>
/// Sets whether gRPC keep-alive should be sent when there are no active calls.
/// </summary>
/// <param name="permitWithoutCalls">Whether to send keep-alive pings even when there are no active calls.</param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseGrpcKeepAlivePermitWithoutCalls(bool permitWithoutCalls)
{
this.GrpcKeepAlivePermitWithoutCalls = permitWithoutCalls;
return this;
}

/// <summary>
/// Builds out the inner DaprClient that provides the core shape of the
/// runtime gRPC client used by the consuming package.
Expand Down Expand Up @@ -209,8 +281,25 @@ protected internal (GrpcChannel channel, HttpClient httpClient, Uri httpEndpoint
//Configure the HTTP client
var httpClient = ConfigureHttpClient(assembly);
this.GrpcChannelOptions.HttpClient = httpClient;

if (this.GrpcKeepAliveEnabled)
{
if (!(this.GrpcChannelOptions.HttpHandler is SocketsHttpHandler))
{
var handler = new SocketsHttpHandler();
this.GrpcChannelOptions.HttpHandler = handler;
}

var socketsHandler = (SocketsHttpHandler)this.GrpcChannelOptions.HttpHandler;

socketsHandler.KeepAlivePingDelay = this.GrpcKeepAliveTime;
socketsHandler.KeepAlivePingTimeout = this.GrpcKeepAliveTimeout;
socketsHandler.KeepAlivePingPolicy = this.GrpcKeepAlivePermitWithoutCalls
? HttpKeepAlivePingPolicy.Always
: HttpKeepAlivePingPolicy.WithActiveRequests;
}

var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions);
var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions);
return (channel, httpClient, httpEndpoint, this.DaprApiToken);
}

Expand All @@ -222,17 +311,17 @@ protected internal (GrpcChannel channel, HttpClient httpClient, Uri httpEndpoint
private HttpClient ConfigureHttpClient(Assembly assembly)
{
var httpClient = HttpClientFactory is not null ? HttpClientFactory() : new HttpClient();

//Set the timeout as necessary
if (this.Timeout > TimeSpan.Zero)
{
httpClient.Timeout = this.Timeout;
}

//Set the user agent
var userAgent = DaprClientUtilities.GetUserAgent(assembly);
httpClient.DefaultRequestHeaders.Add("User-Agent", userAgent.ToString());

//Set the API token
var apiTokenHeader = DaprClientUtilities.GetDaprApiTokenHeader(this.DaprApiToken);
if (apiTokenHeader is not null)
Expand Down