diff --git a/src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs b/src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs index 83418957b..a83f00c11 100644 --- a/src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs +++ b/src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs @@ -7,21 +7,24 @@ namespace Ocelot.Provider.Kubernetes { public class EndPointClientV1 : KubeResourceClient, IEndPointClient { - private readonly HttpRequest _collection; + private readonly HttpRequest _byName; + private readonly HttpRequest _watchByName; public EndPointClientV1(IKubeApiClient client) : base(client) { - _collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}"); + _byName = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}"); + _watchByName = KubeRequest.Create("api/v1/watch/namespaces/{Namespace}/endpoints/{ServiceName}"); } - public async Task GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default) + public async Task GetAsync(string serviceName, string kubeNamespace = null, + CancellationToken cancellationToken = default) { if (string.IsNullOrEmpty(serviceName)) { throw new ArgumentNullException(nameof(serviceName)); } - var request = _collection + var request = _byName .WithTemplateParameters(new { Namespace = kubeNamespace ?? KubeClient.DefaultNamespace, @@ -34,5 +37,23 @@ public async Task GetAsync(string serviceName, string kubeNamespace ? await response.ReadContentAsAsync() : null; } + + public IObservable> Watch(string serviceName, string kubeNamespace, + CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(serviceName)) + { + throw new ArgumentNullException(nameof(serviceName)); + } + + return ObserveEvents( + _watchByName.WithTemplateParameters(new + { + ServiceName = serviceName, + Namespace = kubeNamespace ?? KubeClient.DefaultNamespace, + }), + "watch v1/Endpoints '" + serviceName + "' in namespace " + + (kubeNamespace ?? KubeClient.DefaultNamespace)); + } } } diff --git a/src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs b/src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs index 10f79f8af..8e85df73f 100644 --- a/src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs +++ b/src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs @@ -6,4 +6,6 @@ namespace Ocelot.Provider.Kubernetes.Interfaces; public interface IEndPointClient : IKubeResourceClient { Task GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default); + + IObservable> Watch(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default); } diff --git a/src/Ocelot.Provider.Kubernetes/Kube.cs b/src/Ocelot.Provider.Kubernetes/Kube.cs index 39dca0b41..e52c4c9e4 100644 --- a/src/Ocelot.Provider.Kubernetes/Kube.cs +++ b/src/Ocelot.Provider.Kubernetes/Kube.cs @@ -47,7 +47,7 @@ public virtual async Task> GetAsync() } private Task GetEndpoint() => _kubeApi - .ResourceClient(client => new EndPointClientV1(client)) + .EndpointsV1() .GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace); private bool CheckErroneousState(EndpointsV1 endpoint) diff --git a/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs new file mode 100644 index 000000000..a9d6dcccc --- /dev/null +++ b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs @@ -0,0 +1,9 @@ +using Ocelot.Provider.Kubernetes.Interfaces; + +namespace Ocelot.Provider.Kubernetes; + +public static class KubeApiClientExtensions +{ + public static IEndPointClient EndpointsV1(this IKubeApiClient client) + => client.ResourceClient(x => new EndPointClientV1(client)); +} diff --git a/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs b/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs index a3a1d48c0..c96c04834 100644 --- a/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs +++ b/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs @@ -1,18 +1,20 @@ -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Ocelot.Configuration; -using Ocelot.Logging; +using Ocelot.Logging; using Ocelot.Provider.Kubernetes.Interfaces; namespace Ocelot.Provider.Kubernetes { public static class KubernetesProviderFactory - { - /// - /// String constant used for provider type definition. + { + /// + /// String constant used for provider type definition. /// public const string PollKube = nameof(Kubernetes.PollKube); - - public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider; + + public const string WatchKube = nameof(Kubernetes.WatchKube); + + public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider; private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamRoute route) { @@ -27,11 +29,16 @@ private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provide Scheme = route.DownstreamScheme, }; + if (WatchKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)) + { + return new WatchKube(configuration, factory, kubeClient, serviceBuilder); + } + var defaultK8sProvider = new Kube(configuration, factory, kubeClient, serviceBuilder); - - return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase) - ? new PollKube(config.PollingInterval, factory, defaultK8sProvider) - : defaultK8sProvider; + + return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase) + ? new PollKube(config.PollingInterval, factory, defaultK8sProvider) + : defaultK8sProvider; } } } diff --git a/src/Ocelot.Provider.Kubernetes/WatchKube.cs b/src/Ocelot.Provider.Kubernetes/WatchKube.cs new file mode 100644 index 000000000..cb178f26d --- /dev/null +++ b/src/Ocelot.Provider.Kubernetes/WatchKube.cs @@ -0,0 +1,78 @@ +using KubeClient.Models; +using Ocelot.Logging; +using Ocelot.Provider.Kubernetes.Interfaces; +using Ocelot.Values; + +namespace Ocelot.Provider.Kubernetes; + +// Dispose() won't be called because provider wasn't resolved from DI +public class WatchKube : IServiceDiscoveryProvider, IDisposable +{ + private readonly KubeRegistryConfiguration _configuration; + private readonly IOcelotLogger _logger; + private readonly IKubeApiClient _kubeApi; + private readonly IKubeServiceBuilder _serviceBuilder; + + private List _services = null; + private readonly IDisposable _subscription; + + public WatchKube( + KubeRegistryConfiguration configuration, + IOcelotLoggerFactory factory, + IKubeApiClient kubeApi, + IKubeServiceBuilder serviceBuilder) + { + _configuration = configuration; + _logger = factory.CreateLogger(); + _kubeApi = kubeApi; + _serviceBuilder = serviceBuilder; + + _subscription = CreateSubscription(); + } + + public virtual async Task> GetAsync() + { + // need to wait for first result fetching somehow + if (_services is null) + { + await Task.Delay(1000); + } + + if (_services is not { Count: > 0 }) + { + _logger.LogWarning(() => GetMessage("Subscription to service endpoints gave no results!")); + } + + return _services; + } + + private IDisposable CreateSubscription() => + _kubeApi + .EndpointsV1() + .Watch(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace) + .Subscribe( + onNext: endpointEvent => + { + _services = endpointEvent.EventType switch + { + ResourceEventType.Deleted or ResourceEventType.Error => new(), + _ when (endpointEvent.Resource?.Subsets?.Count ?? 0) == 0 => new(), + _ => _serviceBuilder.BuildServices(_configuration, endpointEvent.Resource).ToList(), + }; + }, + onError: ex => + { + // recreate subscription in case of exceptions? + _logger.LogError(() => GetMessage("Endpoints subscription error occured"), ex); + }, + onCompleted: () => + { + // called only when subscription is cancelled + _logger.LogWarning(() => GetMessage("Subscription to service endpoints completed")); + }); + + private string GetMessage(string message) + => $"{nameof(WatchKube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}"; + + public void Dispose() => _subscription.Dispose(); +}