diff --git a/pkg/ha/ha_service.go b/pkg/ha/ha_service.go index 2ce74c0..ef33b34 100644 --- a/pkg/ha/ha_service.go +++ b/pkg/ha/ha_service.go @@ -12,7 +12,8 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ctlmgr "sigs.k8s.io/controller-runtime/pkg/manager" @@ -68,7 +69,7 @@ func (ha *HAService) setEndpoints(ctx context.Context) error { // Bypass client cache to avoid triggering a cluster wide list-watch for Endpoints - our RBAC does not allow it err := ha.manager.GetAPIReader().Get(ctx, client.ObjectKey{Namespace: ha.namespace, Name: app.Name}, &endpoints) if err != nil { - if !errors.IsNotFound(err) { + if !apierrors.IsNotFound(err) { return fmt.Errorf("updating the service endpoint to point to the new leader: retrieving endpoints: %w", err) } @@ -98,6 +99,7 @@ func (ha *HAService) Start(ctx context.Context) error { select { case <-ctx.Done(): + _ = ha.cleanUpServiceEndpoints() return fmt.Errorf("starting HA service: %w", ctx.Err()) case <-ha.testIsolation.TimeAfter(retryPeriod): } @@ -108,5 +110,78 @@ func (ha *HAService) Start(ctx context.Context) error { } } - return nil + <-ctx.Done() + err := ha.cleanUpServiceEndpoints() + if err == nil { + err = ctx.Err() + } + return err +} + +// cleanUpServiceEndpoints is executed upon ending leadership. Its purpose is to remove the Endpoints object created upon acquiring +// leadership. +func (ha *HAService) cleanUpServiceEndpoints() error { + // Use our own context. This function executes when the main application context is closed. + // Also, try to finish before a potential 15 seconds termination grace timeout. + ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second) + defer cancel() + seedClient := ha.manager.GetClient() + + attempt := 0 + var err error + for { + endpoints := corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: app.Name, + Namespace: ha.namespace, + }, + } + err = seedClient.Get(ctx, client.ObjectKeyFromObject(&endpoints), &endpoints) + if err != nil { + if apierrors.IsNotFound(err) { + ha.log.V(app.VerbosityVerbose).Info("The endpoints object cleanup succeeded: the object was missing") + return nil + } + + ha.log.V(app.VerbosityInfo).Info("Failed to retrieve the endpoints object", "error", err.Error()) + } else { + // Avoid data race. We don't want to delete the endpoint if it is sending traffic to a replica other than this one. + if !ha.isEndpointStillPointingToOurReplica(&endpoints) { + // Someone else is using the endpoint. We can't perform safe cleanup. Abandon the object. + ha.log.V(app.VerbosityWarning).Info( + "Abandoning endpoints object because it was modified by an external actor") + return nil + } + + // Only delete the endpoint if it is the resource version for which we confirmed that it points to us. + deletionPrecondition := client.Preconditions{UID: &endpoints.UID, ResourceVersion: &endpoints.ResourceVersion} + err = seedClient.Delete(ctx, &endpoints, deletionPrecondition) + if client.IgnoreNotFound(err) == nil { + // The endpoint was deleted (even if not by us). We call that successful cleanup. + ha.log.V(app.VerbosityVerbose).Info("The endpoints object cleanup succeeded") + return nil + } + ha.log.V(app.VerbosityInfo).Info("Failed to delete the endpoints object", "error", err.Error()) + } + + // Deletion request failed, possibly because of a midair collision. Wait a bit and retry. + attempt++ + if attempt >= 10 { + break + } + time.Sleep(1 * time.Second) + } + + ha.log.V(app.VerbosityError).Error(err, "All retries to delete the endpoints object failed. Abandoning object.") + return fmt.Errorf("HAService cleanup: deleting endponts object: retrying failed, last error: %w", err) +} + +// Does the endpoints object hold the same values as the ones we previously set to it? +func (ha *HAService) isEndpointStillPointingToOurReplica(endpoints *corev1.Endpoints) bool { + return len(endpoints.Subsets) == 1 && + len(endpoints.Subsets[0].Addresses) == 1 && + endpoints.Subsets[0].Addresses[0].IP == ha.servingIPAddress && + len(endpoints.Subsets[0].Ports) == 1 && + endpoints.Subsets[0].Ports[0].Port == int32(ha.servingPort) && + endpoints.Subsets[0].Ports[0].Protocol == corev1.ProtocolTCP } diff --git a/pkg/ha/ha_service_test.go b/pkg/ha/ha_service_test.go index 739c66a..825f4b3 100644 --- a/pkg/ha/ha_service_test.go +++ b/pkg/ha/ha_service_test.go @@ -6,6 +6,7 @@ package ha import ( "context" + "fmt" "sync/atomic" "time" @@ -13,6 +14,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -26,121 +28,154 @@ var _ = Describe("HAService", func() { testIPAddress = "1.2.3.4" testPort = 777 ) - - Describe("Start", func() { - It("should set the respective service endpoints ", func() { - // Arrange - manager := testutil.NewFakeManager() - ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) - - endpoints := &corev1.Endpoints{ + // Helper functions + var ( + makeEmptyEndpointsObject = func(namespace string) *corev1.Endpoints { + return &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: app.Name, - Namespace: ha.namespace, + Namespace: namespace, }, } - Expect(ha.manager.GetClient().Create(context.Background(), endpoints)).To(Succeed()) + } + arrange = func() (*HAService, *testutil.FakeManager, context.Context, context.CancelFunc) { + manager := testutil.NewFakeManager() + ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) + ctx, cancel := context.WithCancel(context.Background()) + return ha, manager, ctx, cancel + } + createEndpointsObjectOnServer = func(namespace string, client kclient.Client) { + endpoints := makeEmptyEndpointsObject(namespace) + Expect(client.Create(context.Background(), endpoints)).To(Succeed()) + } + waitGetChangedEndpoints = func(ha *HAService, actualEndpoints *corev1.Endpoints) error { + // This function returns nil if the endpoints object exists and has changed from its initial, unpopulated state + + err := ha.manager.GetClient().Get( + context.Background(), kclient.ObjectKey{Namespace: ha.namespace, Name: app.Name}, actualEndpoints) + if err != nil { + return err + } + if actualEndpoints.Subsets == nil { + return fmt.Errorf("endpoiints object not populated") + } + return nil + } + expectEndpointsPopulated = func(actualEndpoints *corev1.Endpoints) { + Expect(actualEndpoints.Labels).NotTo(BeNil()) + Expect(actualEndpoints.Labels["app"]).To(Equal(app.Name)) + Expect(actualEndpoints.Subsets).To(HaveLen(1)) + Expect(actualEndpoints.Subsets[0].Addresses).To(HaveLen(1)) + Expect(actualEndpoints.Subsets[0].Addresses[0].IP).To(Equal(testIPAddress)) + Expect(actualEndpoints.Subsets[0].Ports).To(HaveLen(1)) + Expect(actualEndpoints.Subsets[0].Ports[0].Port).To(Equal(int32(testPort))) + } + ) + + Describe("Start", func() { + It("should create/update the respective service endpoints object ", func() { + // Arrange + ha, _, ctx, cancel := arrange() + defer cancel() + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // So, create an empty object in the fake client first. + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + var err error // Act - err := ha.Start(context.Background()) + go func() { + err = ha.Start(ctx) + }() // Assert - Expect(err).To(Succeed()) - actual := corev1.Endpoints{} - manager.GetClient().Get(context.Background(), kclient.ObjectKey{Namespace: testNs, Name: app.Name}, &actual) - Expect(actual.Labels).NotTo(BeNil()) - Expect(actual.Labels["app"]).To(Equal(app.Name)) - Expect(actual.Subsets).To(HaveLen(1)) - Expect(actual.Subsets[0].Addresses).To(HaveLen(1)) - Expect(actual.Subsets[0].Addresses[0].IP).To(Equal(testIPAddress)) - Expect(actual.Subsets[0].Ports).To(HaveLen(1)) - Expect(actual.Subsets[0].Ports[0].Port).To(Equal(int32(testPort))) + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + Expect(err).NotTo(HaveOccurred()) + expectEndpointsPopulated(actualEndpoints) }) - It("should wait and retry with exponential backoff, if the service endpoints are missing, and succeed "+ - "once they appear", func() { - + It("should immediately abort retrying, if the context gets canceled", func() { // Arrange - manager := testutil.NewFakeManager() - ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) + ha, manager, ctx, cancel := arrange() + defer cancel() + var err error + var isComplete atomic.Bool + timeAfterChan := make(chan time.Time) - var timeAfterDuration atomic.Int64 - ha.testIsolation.TimeAfter = func(duration time.Duration) <-chan time.Time { - timeAfterDuration.Store(int64(duration)) + ha.testIsolation.TimeAfter = func(_ time.Duration) <-chan time.Time { return timeAfterChan } - var err error - var isComplete atomic.Bool // Act and assert go func() { - err = ha.Start(context.Background()) + err = ha.Start(ctx) isComplete.Store(true) }() - Consistently(isComplete.Load).Should(BeFalse()) - Expect(timeAfterDuration.Load()).To(Equal(int64(1 * time.Second))) - timeAfterChan <- time.Now() + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // Here we rely on this failure to halt the progress of configuring the endpoints objects. + // In the real world, the cause of the faults would be different, but that should trigger the same retry + // mechanic. Consistently(isComplete.Load).Should(BeFalse()) - Expect(timeAfterDuration.Load()).To(Equal(int64(2 * time.Second))) - - endpoints := &corev1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: app.Name, - Namespace: ha.namespace, - }, - } - Expect(ha.manager.GetClient().Create(context.Background(), endpoints)).To(Succeed()) - - timeAfterChan <- time.Now() + cancel() Eventually(isComplete.Load).Should(BeTrue()) - Expect(err).To(Succeed()) + Expect(err).To(MatchError(ContainSubstring("canceled"))) actual := corev1.Endpoints{} - manager.GetClient().Get(context.Background(), kclient.ObjectKey{Namespace: testNs, Name: app.Name}, &actual) - Expect(actual.Subsets).To(HaveLen(1)) - Expect(actual.Subsets[0].Addresses).To(HaveLen(1)) - Expect(actual.Subsets[0].Addresses[0].IP).To(Equal(testIPAddress)) + err = manager.GetClient().Get(context.Background(), kclient.ObjectKey{Namespace: testNs, Name: app.Name}, &actual) + Expect(err).To(HaveOccurred()) }) - It("should immediately abort retrying, if the context gets canceled", func() { + It("should wait and retry with exponential backoff, if the service endpoints are missing, and succeed "+ + "once they appear", func() { + // Arrange - manager := testutil.NewFakeManager() - ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) + ha, _, ctx, cancel := arrange() + defer cancel() + var err error + var isComplete atomic.Bool timeAfterChan := make(chan time.Time) - ha.testIsolation.TimeAfter = func(_ time.Duration) <-chan time.Time { + var timeAfterDuration atomic.Int64 + ha.testIsolation.TimeAfter = func(duration time.Duration) <-chan time.Time { + timeAfterDuration.Store(int64(duration)) return timeAfterChan } - var err error - var isComplete atomic.Bool - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Act and assert go func() { err = ha.Start(ctx) isComplete.Store(true) }() + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // Here we rely on this failure to halt the progress of configuring the endpoints objects. + // In the real world, the cause of the faults would be different, but that should trigger the same retry + // mechanic. + Consistently(isComplete.Load).Should(BeFalse()) + Expect(timeAfterDuration.Load()).To(Equal(int64(1 * time.Second))) + timeAfterChan <- time.Now() Consistently(isComplete.Load).Should(BeFalse()) + Expect(timeAfterDuration.Load()).To(Equal(int64(2 * time.Second))) - cancel() - Eventually(isComplete.Load).Should(BeTrue()) - Expect(err).To(MatchError(ContainSubstring("canceled"))) - actual := corev1.Endpoints{} - err = manager.GetClient().Get(context.Background(), kclient.ObjectKey{Namespace: testNs, Name: app.Name}, &actual) - Expect(err).To(HaveOccurred()) + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + timeAfterChan <- time.Now() + + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + Expect(err).To(Succeed()) + expectEndpointsPopulated(actualEndpoints) }) It("should use exponential backoff", func() { - // Arrange - manager := testutil.NewFakeManager() - ha := NewHAService(manager, testNs, testIPAddress, testPort, logr.Discard()) + ha, _, ctx, cancel := arrange() + defer cancel() timeAfterChan := make(chan time.Time) var timeAfterDuration atomic.Int64 ha.testIsolation.TimeAfter = func(duration time.Duration) <-chan time.Time { @@ -150,9 +185,14 @@ var _ = Describe("HAService", func() { // Act and assert go func() { - ha.Start(context.Background()) + _ = ha.Start(ctx) }() + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // Here we rely on this failure to halt the progress of configuring the endpoints objects. + // In the real world, the cause of the faults would be different, but that should trigger the same retry + // mechanic. expectedPeriod := 1 * time.Second expectedMax := 5 * time.Minute for i := 0; i < 20; i++ { @@ -165,5 +205,100 @@ var _ = Describe("HAService", func() { } Consistently(timeAfterDuration.Load).Should(Equal(int64(expectedMax))) }) + + It("should delete its service endpoint when context closes", func() { + // Arrange + ha, _, ctx, cancel := arrange() + defer cancel() + var err error + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // So, create an empty object in the fake client first. + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + + // Act & assert + go func() { + err = ha.Start(ctx) + }() + + // Wait for HAService to update the Endpoints object + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + + cancel() + + // Wait for HAService to delete update the Endpoints object + Eventually(func() bool { + err := ha.manager.GetClient().Get( + context.Background(), kclient.ObjectKey{Namespace: ha.namespace, Name: app.Name}, actualEndpoints) + return apierrors.IsNotFound(err) + }).Should(BeTrue()) + + Expect(err.Error()).To(ContainSubstring("canceled")) + }) + + It("upon exit, cleanup should not delete the service endpoint if it points to a different IP address", func() { + // Arrange + ha, _, ctx, cancel := arrange() + defer cancel() + var err error + var isComplete atomic.Bool + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // So, create an empty object in the fake client first. + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + + // Act & assert + go func() { + err = ha.Start(ctx) + isComplete.Store(true) + }() + + // Wait for HAService to update the Endpoints object + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + + // Modify the Endpoints object so it no longer points to our pod + actualEndpoints.Subsets[0].Addresses[0].IP = "1.1.1.1" + Expect(ha.manager.GetClient().Update(ctx, actualEndpoints)).To(Succeed()) + + cancel() + + // Make sure the HAService did not delete the Endpoints object + Eventually(isComplete.Load).Should(BeTrue()) + Expect(err.Error()).To(ContainSubstring("canceled")) + Expect( + ha.manager.GetClient().Get( + context.Background(), kclient.ObjectKey{Namespace: ha.namespace, Name: app.Name}, actualEndpoints)). + To(Succeed()) + }) + + It("upon exit, cleanup should succeed if endpoints object is deleted by an external actor", func() { + // Arrange + ha, _, ctx, cancel := arrange() + defer cancel() + var err error + var isComplete atomic.Bool + // Real K8s API HTTP PUT does create/update and works file if the Endpoints object is missing. The update + // operation of the client type in the test fake library we use, fails if the object is missing. + // So, create an empty object in the fake client first. + createEndpointsObjectOnServer(ha.namespace, ha.manager.GetClient()) + + // Act & assert + go func() { + err = ha.Start(ctx) + isComplete.Store(true) + }() + + // Wait for HAService to update the Endpoints object + actualEndpoints := makeEmptyEndpointsObject(ha.namespace) + Eventually(func() error { return waitGetChangedEndpoints(ha, actualEndpoints) }).Should(Succeed()) + + // Delete the Endpoints object before triggering cleanup. Error should be "context canceled" and not e.g. "not found" + Expect(ha.manager.GetClient().Delete(ctx, actualEndpoints)).To(Succeed()) + cancel() + Eventually(isComplete.Load).Should(BeTrue()) + Expect(err.Error()).To(ContainSubstring("canceled")) + }) }) })