Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2168 WatchKube discovery provider #2173

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@ namespace Ocelot.Provider.Kubernetes
{
public class EndPointClientV1 : KubeResourceClient, IEndPointClient
{
private readonly HttpRequest _collection;
private readonly HttpRequest _byName;
private readonly HttpRequest _watchByName;

public EndPointClientV1(IKubeApiClient client) : base(client)
{
_collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
_byName = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
_watchByName = KubeRequest.Create("api/v1/watch/namespaces/{Namespace}/endpoints/{ServiceName}");
}

public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default)
public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}

var request = _collection
var request = _byName
.WithTemplateParameters(new
{
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
Expand All @@ -34,5 +37,23 @@ public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace
? await response.ReadContentAsAsync<EndpointsV1>()
: null;
}

public IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}

return ObserveEvents<EndpointsV1>(
_watchByName.WithTemplateParameters(new
{
ServiceName = serviceName,
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
}),
"watch v1/Endpoints '" + serviceName + "' in namespace " +
(kubeNamespace ?? KubeClient.DefaultNamespace));
}
}
}
2 changes: 2 additions & 0 deletions src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ namespace Ocelot.Provider.Kubernetes.Interfaces;
public interface IEndPointClient : IKubeResourceClient
{
Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);

IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/Ocelot.Provider.Kubernetes/Kube.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public virtual async Task<List<Service>> GetAsync()
}

private Task<EndpointsV1> GetEndpoint() => _kubeApi
.ResourceClient(client => new EndPointClientV1(client))
.EndpointsV1()
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);

private bool CheckErroneousState(EndpointsV1 endpoint)
Expand Down
9 changes: 9 additions & 0 deletions src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Ocelot.Provider.Kubernetes.Interfaces;

namespace Ocelot.Provider.Kubernetes;

public static class KubeApiClientExtensions
{
public static IEndPointClient EndpointsV1(this IKubeApiClient client)
=> client.ResourceClient(x => new EndPointClientV1(client));
}
29 changes: 18 additions & 11 deletions src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Ocelot.Configuration;
using Ocelot.Logging;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;

namespace Ocelot.Provider.Kubernetes
{
public static class KubernetesProviderFactory
{
/// <summary>
/// String constant used for provider type definition.
{
/// <summary>
/// String constant used for provider type definition.
/// </summary>
public const string PollKube = nameof(Kubernetes.PollKube);

public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;

public const string WatchKube = nameof(Kubernetes.WatchKube);

public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;

private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamRoute route)
{
Expand All @@ -27,11 +29,16 @@ private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provide
Scheme = route.DownstreamScheme,
};

if (WatchKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase))
{
return new WatchKube(configuration, factory, kubeClient, serviceBuilder);
}

var defaultK8sProvider = new Kube(configuration, factory, kubeClient, serviceBuilder);

return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
: defaultK8sProvider;
return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
: defaultK8sProvider;
}
}
}
78 changes: 78 additions & 0 deletions src/Ocelot.Provider.Kubernetes/WatchKube.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using KubeClient.Models;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;
using Ocelot.Values;

namespace Ocelot.Provider.Kubernetes;

// Dispose() won't be called because provider wasn't resolved from DI
public class WatchKube : IServiceDiscoveryProvider, IDisposable
{
private readonly KubeRegistryConfiguration _configuration;
private readonly IOcelotLogger _logger;
private readonly IKubeApiClient _kubeApi;
private readonly IKubeServiceBuilder _serviceBuilder;

private List<Service> _services = null;
private readonly IDisposable _subscription;

public WatchKube(
KubeRegistryConfiguration configuration,
IOcelotLoggerFactory factory,
IKubeApiClient kubeApi,
IKubeServiceBuilder serviceBuilder)
{
_configuration = configuration;
_logger = factory.CreateLogger<Kube>();
_kubeApi = kubeApi;
_serviceBuilder = serviceBuilder;

_subscription = CreateSubscription();
}

public virtual async Task<List<Service>> GetAsync()
{
// need to wait for first result fetching somehow
if (_services is null)
{
await Task.Delay(1000);
}

if (_services is not { Count: > 0 })
{
_logger.LogWarning(() => GetMessage("Subscription to service endpoints gave no results!"));
}

return _services;
}

private IDisposable CreateSubscription() =>
_kubeApi
.EndpointsV1()
.Watch(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace)
.Subscribe(
onNext: endpointEvent =>
{
_services = endpointEvent.EventType switch
{
ResourceEventType.Deleted or ResourceEventType.Error => new(),
_ when (endpointEvent.Resource?.Subsets?.Count ?? 0) == 0 => new(),
_ => _serviceBuilder.BuildServices(_configuration, endpointEvent.Resource).ToList(),
};
},
onError: ex =>
{
// recreate subscription in case of exceptions?
_logger.LogError(() => GetMessage("Endpoints subscription error occured"), ex);
},
onCompleted: () =>
{
// called only when subscription is cancelled
_logger.LogWarning(() => GetMessage("Subscription to service endpoints completed"));
});

private string GetMessage(string message)
=> $"{nameof(WatchKube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}";

public void Dispose() => _subscription.Dispose();
}