-
Notifications
You must be signed in to change notification settings - Fork 2
Clean up Endpoints object #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,7 +12,8 @@ import ( | |||||||||
|
|
||||||||||
| "github.com/go-logr/logr" | ||||||||||
| corev1 "k8s.io/api/core/v1" | ||||||||||
| "k8s.io/apimachinery/pkg/api/errors" | ||||||||||
| apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||||||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||||||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||||||||||
| ctlmgr "sigs.k8s.io/controller-runtime/pkg/manager" | ||||||||||
|
|
||||||||||
|
|
@@ -68,7 +69,7 @@ func (ha *HAService) setEndpoints(ctx context.Context) error { | |||||||||
| // Bypass client cache to avoid triggering a cluster wide list-watch for Endpoints - our RBAC does not allow it | ||||||||||
| err := ha.manager.GetAPIReader().Get(ctx, client.ObjectKey{Namespace: ha.namespace, Name: app.Name}, &endpoints) | ||||||||||
| if err != nil { | ||||||||||
| if !errors.IsNotFound(err) { | ||||||||||
| if !apierrors.IsNotFound(err) { | ||||||||||
| return fmt.Errorf("updating the service endpoint to point to the new leader: retrieving endpoints: %w", err) | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -98,6 +99,7 @@ func (ha *HAService) Start(ctx context.Context) error { | |||||||||
|
|
||||||||||
| select { | ||||||||||
| case <-ctx.Done(): | ||||||||||
| _ = ha.cleanUpServiceEndpoints() | ||||||||||
| return fmt.Errorf("starting HA service: %w", ctx.Err()) | ||||||||||
| case <-ha.testIsolation.TimeAfter(retryPeriod): | ||||||||||
| } | ||||||||||
|
|
@@ -108,5 +110,78 @@ func (ha *HAService) Start(ctx context.Context) error { | |||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| return nil | ||||||||||
| <-ctx.Done() | ||||||||||
| err := ha.cleanUpServiceEndpoints() | ||||||||||
| if err == nil { | ||||||||||
| err = ctx.Err() | ||||||||||
| } | ||||||||||
| return err | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // cleanUpServiceEndpoints is executed upon ending leadership. Its purpose is to remove the Endpoints object created upon acquiring | ||||||||||
| // leadership. | ||||||||||
| func (ha *HAService) cleanUpServiceEndpoints() error { | ||||||||||
| // Use our own context. This function executes when the main application context is closed. | ||||||||||
| // Also, try to finish before a potential 15 seconds termination grace timeout. | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From where these 15s of potential termination grace timeout come from?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 15 is a nice, round number, and also half of the default 30. I'm speculating that upon a hypothetical future shortening of grace period, 15 will be a likely choice (the other obvious choice being 10, of course). This is not a critical choice. I'm simply picking a value which is likely to work slightly better with potential future changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is an option for the manager which allows to specify the termination grace period for all runnables: Either way, if you have a strong reason to not specify the default or not make it configurable, and keep it |
||||||||||
| ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second) | ||||||||||
| defer cancel() | ||||||||||
| seedClient := ha.manager.GetClient() | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I wouldn't call it a seedClient. In all other places we call it
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, but since this program is talking to multiple clusters, I don't want to use "client". I agree with your point that the name will need to change in the future, but at that time I'll also have the the context which will allow me come up with the right genralisation, without resorting to the excessively general (IMO) "client". |
||||||||||
|
|
||||||||||
| attempt := 0 | ||||||||||
| var err error | ||||||||||
| for { | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use some of the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I admit that it may have been better to use a poll function, but I don't think this it's worth refactoring. Replacing a "for" with which everybody is familiar, with а callback-based function in a domain-specific library, is a matter of preference. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd personally prefer to use
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Image that we have (or introduce) a bug in the func by forgetting to break/exit in 1 case. This
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ialidzhikov @plkokanov But the fact is that I haven't used Poll() before, so I can't really judge. I expect you're probably right and the improved readability justifies the refactoring. I'll use Poll(). |
||||||||||
| endpoints := corev1.Endpoints{ | ||||||||||
| ObjectMeta: metav1.ObjectMeta{ | ||||||||||
| Name: app.Name, | ||||||||||
| Namespace: ha.namespace, | ||||||||||
| }, | ||||||||||
| } | ||||||||||
| err = seedClient.Get(ctx, client.ObjectKeyFromObject(&endpoints), &endpoints) | ||||||||||
| if err != nil { | ||||||||||
| if apierrors.IsNotFound(err) { | ||||||||||
| ha.log.V(app.VerbosityVerbose).Info("The endpoints object cleanup succeeded: the object was missing") | ||||||||||
| return nil | ||||||||||
| } | ||||||||||
|
|
||||||||||
| ha.log.V(app.VerbosityInfo).Info("Failed to retrieve the endpoints object", "error", err.Error()) | ||||||||||
| } else { | ||||||||||
| // Avoid data race. We don't want to delete the endpoint if it is sending traffic to a replica other than this one. | ||||||||||
| if !ha.isEndpointStillPointingToOurReplica(&endpoints) { | ||||||||||
| // Someone else is using the endpoint. We can't perform safe cleanup. Abandon the object. | ||||||||||
| ha.log.V(app.VerbosityWarning).Info( | ||||||||||
| "Abandoning endpoints object because it was modified by an external actor") | ||||||||||
| return nil | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Only delete the endpoint if it is the resource version for which we confirmed that it points to us. | ||||||||||
| deletionPrecondition := client.Preconditions{UID: &endpoints.UID, ResourceVersion: &endpoints.ResourceVersion} | ||||||||||
| err = seedClient.Delete(ctx, &endpoints, deletionPrecondition) | ||||||||||
| if client.IgnoreNotFound(err) == nil { | ||||||||||
|
Comment on lines
+158
to
+159
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm intentionally keeping "delete with precondition" on a separate line here. It's an uncommon construct, which is likely to give the reader a pause, and I don't want to force other logic on the same line. |
||||||||||
| // The endpoint was deleted (even if not by us). We call that successful cleanup. | ||||||||||
| ha.log.V(app.VerbosityVerbose).Info("The endpoints object cleanup succeeded") | ||||||||||
| return nil | ||||||||||
| } | ||||||||||
| ha.log.V(app.VerbosityInfo).Info("Failed to delete the endpoints object", "error", err.Error()) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Deletion request failed, possibly because of a midair collision. Wait a bit and retry. | ||||||||||
| attempt++ | ||||||||||
| if attempt >= 10 { | ||||||||||
| break | ||||||||||
| } | ||||||||||
| time.Sleep(1 * time.Second) | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about using a time.NewTicker, or better yet |
||||||||||
| } | ||||||||||
|
|
||||||||||
| ha.log.V(app.VerbosityError).Error(err, "All retries to delete the endpoints object failed. Abandoning object.") | ||||||||||
| return fmt.Errorf("HAService cleanup: deleting endponts object: retrying failed, last error: %w", err) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Does the endpoints object hold the same values as the ones we previously set to it? | ||||||||||
| func (ha *HAService) isEndpointStillPointingToOurReplica(endpoints *corev1.Endpoints) bool { | ||||||||||
| return len(endpoints.Subsets) == 1 && | ||||||||||
| len(endpoints.Subsets[0].Addresses) == 1 && | ||||||||||
| endpoints.Subsets[0].Addresses[0].IP == ha.servingIPAddress && | ||||||||||
| len(endpoints.Subsets[0].Ports) == 1 && | ||||||||||
| endpoints.Subsets[0].Ports[0].Port == int32(ha.servingPort) && | ||||||||||
| endpoints.Subsets[0].Ports[0].Protocol == corev1.ProtocolTCP | ||||||||||
| } | ||||||||||
Uh oh!
There was an error while loading. Please reload this page.