Skip to content

Commit 570bb5e

Browse files
authored
Merge pull request #5562 from phuhung273/instancestate-sdk-v2
🌱Migrate instancestate to AWS SDK v2
2 parents f885d92 + 7c72605 commit 570bb5e

File tree

19 files changed

+409
-4128
lines changed

19 files changed

+409
-4128
lines changed

controllers/awscluster_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func (r *AWSClusterReconciler) reconcileDelete(ctx context.Context, clusterScope
228228

229229
if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
230230
instancestateSvc := instancestate.NewService(clusterScope)
231-
if err := instancestateSvc.DeleteEC2Events(); err != nil {
231+
if err := instancestateSvc.DeleteEC2Events(ctx); err != nil {
232232
// Not deleting the events isn't critical to cluster deletion
233233
clusterScope.Error(err, "non-fatal: failed to delete EventBridge notifications")
234234
}
@@ -348,7 +348,7 @@ func (r *AWSClusterReconciler) reconcileNormal(ctx context.Context, clusterScope
348348

349349
if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
350350
instancestateSvc := instancestate.NewService(clusterScope)
351-
if err := instancestateSvc.ReconcileEC2Events(); err != nil {
351+
if err := instancestateSvc.ReconcileEC2Events(ctx); err != nil {
352352
// non fatal error, so we continue
353353
clusterScope.Error(err, "non-fatal: failed to set up EventBridge")
354354
}

controllers/awsmachine_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func (r *AWSMachineReconciler) reconcileDelete(ctx context.Context, machineScope
346346

347347
if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
348348
instancestateSvc := instancestate.NewService(ec2Scope)
349-
instancestateSvc.RemoveInstanceFromEventPattern(instance.ID)
349+
instancestateSvc.RemoveInstanceFromEventPattern(ctx, instance.ID)
350350
}
351351

352352
// Check the instance state. If it's already shutting down or terminated,
@@ -558,7 +558,7 @@ func (r *AWSMachineReconciler) reconcileNormal(ctx context.Context, machineScope
558558

559559
if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
560560
instancestateSvc := instancestate.NewService(ec2Scope)
561-
if err := instancestateSvc.AddInstanceToEventPattern(instance.ID); err != nil {
561+
if err := instancestateSvc.AddInstanceToEventPattern(ctx, instance.ID); err != nil {
562562
return ctrl.Result{}, errors.Wrap(err, "failed to add instance to Event Bridge instance state rule")
563563
}
564564
}

controlplane/eks/controllers/awsmanagedcontrolplane_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (r *AWSManagedControlPlaneReconciler) reconcileNormal(ctx context.Context,
364364

365365
if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
366366
instancestateSvc := instancestate.NewService(managedScope)
367-
if err := instancestateSvc.ReconcileEC2Events(); err != nil {
367+
if err := instancestateSvc.ReconcileEC2Events(ctx); err != nil {
368368
// non fatal error, so we continue
369369
managedScope.Error(err, "non-fatal: failed to set up EventBridge")
370370
}

exp/instancestate/awsinstancestate_controller.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ import (
2424
"sync"
2525
"time"
2626

27-
"github.com/aws/aws-sdk-go/aws"
28-
"github.com/aws/aws-sdk-go/aws/awserr"
29-
"github.com/aws/aws-sdk-go/service/sqs"
30-
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
27+
"github.com/aws/aws-sdk-go-v2/aws"
28+
"github.com/aws/aws-sdk-go-v2/service/sqs"
29+
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
3130
"github.com/go-logr/logr"
3231
apierrors "k8s.io/apimachinery/pkg/api/errors"
3332
"k8s.io/klog/v2"
@@ -38,6 +37,7 @@ import (
3837

3938
infrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2"
4039
"sigs.k8s.io/cluster-api-provider-aws/v2/controllers"
40+
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/awserrors"
4141
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/scope"
4242
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/services/instancestate"
4343
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/logger"
@@ -52,7 +52,7 @@ const Ec2InstanceStateLabelKey = "ec2-instance-state"
5252
type AwsInstanceStateReconciler struct {
5353
client.Client
5454
Log logr.Logger
55-
sqsServiceFactory func() sqsiface.SQSAPI
55+
sqsServiceFactory func() instancestate.SQSAPI
5656
queueURLs sync.Map
5757
Endpoints []scope.ServiceEndpoint
5858
WatchFilterValue string
@@ -61,7 +61,7 @@ type AwsInstanceStateReconciler struct {
6161
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=awsclusters,verbs=get;list;watch
6262
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=awsmachines,verbs=get;list;watch
6363

64-
func (r *AwsInstanceStateReconciler) getSQSService(region string) (sqsiface.SQSAPI, error) {
64+
func (r *AwsInstanceStateReconciler) getSQSService(region string) (instancestate.SQSAPI, error) {
6565
if r.sqsServiceFactory != nil {
6666
return r.sqsServiceFactory(), nil
6767
}
@@ -99,7 +99,7 @@ func (r *AwsInstanceStateReconciler) Reconcile(ctx context.Context, req ctrl.Req
9999

100100
// retrieve queue URL if it isn't already tracked
101101
if _, ok := r.queueURLs.Load(awsCluster.Name); !ok {
102-
URL, err := r.getQueueURL(awsCluster)
102+
URL, err := r.getQueueURL(ctx, awsCluster)
103103
if err != nil {
104104
if queueNotFoundError(err) {
105105
return reconcile.Result{}, nil
@@ -129,7 +129,7 @@ func (r *AwsInstanceStateReconciler) watchQueuesForInstanceEvents() {
129129
awsClusterList := &infrav1.AWSClusterList{}
130130
if err := r.Client.List(ctx, awsClusterList); err == nil {
131131
for i, cluster := range awsClusterList.Items {
132-
if URL, err := r.getQueueURL(&awsClusterList.Items[i]); err == nil {
132+
if URL, err := r.getQueueURL(ctx, &awsClusterList.Items[i]); err == nil {
133133
r.queueURLs.Store(cluster.Name, queueParams{region: cluster.Spec.Region, URL: URL})
134134
}
135135
}
@@ -144,7 +144,7 @@ func (r *AwsInstanceStateReconciler) watchQueuesForInstanceEvents() {
144144
r.Log.Error(err, "unable to create SQS client")
145145
return
146146
}
147-
resp, err := sqsSvs.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String(qp.URL)})
147+
resp, err := sqsSvs.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{QueueUrl: aws.String(qp.URL)})
148148
if err != nil {
149149
r.Log.Error(err, "failed to receive messages")
150150
return
@@ -160,7 +160,7 @@ func (r *AwsInstanceStateReconciler) watchQueuesForInstanceEvents() {
160160
// TODO: handle errors during process message. We currently deletes the message regardless.
161161
r.processMessage(ctx, m)
162162

163-
_, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{
163+
_, err = sqsSvs.DeleteMessage(ctx, &sqs.DeleteMessageInput{
164164
QueueUrl: aws.String(qp.URL),
165165
ReceiptHandle: msg.ReceiptHandle,
166166
})
@@ -216,13 +216,13 @@ func (r *AwsInstanceStateReconciler) processMessage(ctx context.Context, msg mes
216216
}
217217

218218
// getQueueURL retrieves the SQS queue URL for a given cluster.
219-
func (r *AwsInstanceStateReconciler) getQueueURL(cluster *infrav1.AWSCluster) (string, error) {
219+
func (r *AwsInstanceStateReconciler) getQueueURL(ctx context.Context, cluster *infrav1.AWSCluster) (string, error) {
220220
sqsSvs, err := r.getSQSService(cluster.Spec.Region)
221221
if err != nil {
222222
return "", err
223223
}
224224
queueName := instancestate.GenerateQueueName(cluster.Name)
225-
resp, err := sqsSvs.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(queueName)})
225+
resp, err := sqsSvs.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{QueueName: aws.String(queueName)})
226226

227227
if err != nil {
228228
return "", err
@@ -232,12 +232,11 @@ func (r *AwsInstanceStateReconciler) getQueueURL(cluster *infrav1.AWSCluster) (s
232232
}
233233

234234
func queueNotFoundError(err error) bool {
235-
if aerr, ok := err.(awserr.Error); ok {
236-
if aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
237-
return true
238-
}
235+
smithyErr := awserrors.ParseSmithyError(err)
236+
if smithyErr == nil {
237+
return false
239238
}
240-
return false
239+
return smithyErr.ErrorCode() == (&sqstypes.QueueDoesNotExist{}).ErrorCode()
241240
}
242241

243242
type queueParams struct {

exp/instancestate/awsinstancestate_controller_test.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import (
2222
"testing"
2323
"time"
2424

25-
"github.com/aws/aws-sdk-go/aws"
26-
"github.com/aws/aws-sdk-go/service/sqs"
27-
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
25+
"github.com/aws/aws-sdk-go-v2/aws"
26+
"github.com/aws/aws-sdk-go-v2/service/sqs"
27+
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
2828
"github.com/golang/mock/gomock"
2929
. "github.com/onsi/gomega"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -36,6 +36,7 @@ import (
3636

3737
infrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2"
3838
"sigs.k8s.io/cluster-api-provider-aws/v2/controllers"
39+
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/services/instancestate"
3940
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/services/instancestate/mock_sqsiface"
4041
)
4142

@@ -45,7 +46,7 @@ func TestAWSInstanceStateController(t *testing.T) {
4546
instanceStateReconciler = &AwsInstanceStateReconciler{
4647
Client: testEnv.Client,
4748
Log: ctrl.Log.WithName("controllers").WithName("AWSInstanceState"),
48-
sqsServiceFactory: func() sqsiface.SQSAPI {
49+
sqsServiceFactory: func() instancestate.SQSAPI {
4950
return sqsSvs
5051
},
5152
}
@@ -58,38 +59,38 @@ func TestAWSInstanceStateController(t *testing.T) {
5859
Name: "aws-cluster-1-instance-1",
5960
Namespace: "default",
6061
}
61-
sqsSvs.EXPECT().GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String("aws-cluster-1-queue")}).AnyTimes().
62+
sqsSvs.EXPECT().GetQueueUrl(gomock.Any(), &sqs.GetQueueUrlInput{QueueName: aws.String("aws-cluster-1-queue")}).AnyTimes().
6263
Return(&sqs.GetQueueUrlOutput{QueueUrl: aws.String("aws-cluster-1-url")}, nil)
63-
sqsSvs.EXPECT().GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String("aws-cluster-2-queue")}).AnyTimes().
64+
sqsSvs.EXPECT().GetQueueUrl(gomock.Any(), &sqs.GetQueueUrlInput{QueueName: aws.String("aws-cluster-2-queue")}).AnyTimes().
6465
Return(&sqs.GetQueueUrlOutput{QueueUrl: aws.String("aws-cluster-2-url")}, nil)
65-
sqsSvs.EXPECT().GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String("aws-cluster-3-queue")}).AnyTimes().
66+
sqsSvs.EXPECT().GetQueueUrl(gomock.Any(), &sqs.GetQueueUrlInput{QueueName: aws.String("aws-cluster-3-queue")}).AnyTimes().
6667
Return(&sqs.GetQueueUrlOutput{QueueUrl: aws.String("aws-cluster-3-url")}, nil)
67-
sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-1-url")}).AnyTimes().
68-
DoAndReturn(func(arg *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) {
68+
sqsSvs.EXPECT().ReceiveMessage(gomock.Any(), &sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-1-url")}).AnyTimes().
69+
DoAndReturn(func(ctx context.Context, arg *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
6970
m := &infrav1.AWSMachine{}
7071
lookupKey := types.NamespacedName{
7172
Namespace: failingMachineMeta.Namespace,
7273
Name: failingMachineMeta.Name,
7374
}
74-
err := k8sClient.Get(context.TODO(), lookupKey, m)
75+
err := k8sClient.Get(ctx, lookupKey, m)
7576
// start returning a message once the AWSMachine is available
7677
if err == nil {
7778
return &sqs.ReceiveMessageOutput{
78-
Messages: []*sqs.Message{{
79+
Messages: []sqstypes.Message{{
7980
ReceiptHandle: aws.String("message-receipt-handle"),
8081
Body: aws.String(messageBodyJSON),
8182
}},
8283
}, nil
8384
}
8485

85-
return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil
86+
return &sqs.ReceiveMessageOutput{Messages: []sqstypes.Message{}}, nil
8687
})
8788

88-
sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-2-url")}).AnyTimes().
89-
Return(&sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil)
90-
sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-3-url")}).AnyTimes().
91-
Return(&sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil)
92-
sqsSvs.EXPECT().DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: aws.String("aws-cluster-1-url"), ReceiptHandle: aws.String("message-receipt-handle")}).AnyTimes().
89+
sqsSvs.EXPECT().ReceiveMessage(gomock.Any(), &sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-2-url")}).AnyTimes().
90+
Return(&sqs.ReceiveMessageOutput{Messages: []sqstypes.Message{}}, nil)
91+
sqsSvs.EXPECT().ReceiveMessage(gomock.Any(), &sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-3-url")}).AnyTimes().
92+
Return(&sqs.ReceiveMessageOutput{Messages: []sqstypes.Message{}}, nil)
93+
sqsSvs.EXPECT().DeleteMessage(gomock.Any(), &sqs.DeleteMessageInput{QueueUrl: aws.String("aws-cluster-1-url"), ReceiptHandle: aws.String("message-receipt-handle")}).AnyTimes().
9394
Return(nil, nil)
9495

9596
g.Expect(testEnv.Manager.GetFieldIndexer().IndexField(context.Background(), &infrav1.AWSMachine{},

go.mod

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ require (
88
github.com/aws/amazon-vpc-cni-k8s v1.15.5
99
github.com/aws/aws-lambda-go v1.41.0
1010
github.com/aws/aws-sdk-go v1.55.5
11-
github.com/aws/aws-sdk-go-v2 v1.36.3
11+
github.com/aws/aws-sdk-go-v2 v1.36.5
1212
github.com/aws/aws-sdk-go-v2/config v1.27.11
1313
github.com/aws/aws-sdk-go-v2/credentials v1.17.11
1414
github.com/aws/aws-sdk-go-v2/service/autoscaling v1.52.4
1515
github.com/aws/aws-sdk-go-v2/service/eks v1.64.0
1616
github.com/aws/aws-sdk-go-v2/service/iam v1.32.0
1717
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1
1818
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6
19-
github.com/aws/smithy-go v1.22.2
19+
github.com/aws/smithy-go v1.22.4
2020
github.com/awslabs/goformation/v4 v4.19.5
2121
github.com/blang/semver v3.5.1+incompatible
2222
github.com/coreos/ignition v0.35.0
@@ -80,19 +80,21 @@ require (
8080
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect
8181
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
8282
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect
83-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
84-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
83+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 // indirect
84+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 // indirect
8585
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
86-
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect
86+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.36 // indirect
8787
github.com/aws/aws-sdk-go-v2/service/cloudformation v1.50.0 // indirect
8888
github.com/aws/aws-sdk-go-v2/service/ec2 v1.159.0 // indirect
89+
github.com/aws/aws-sdk-go-v2/service/eventbridge v1.39.3
8990
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
9091
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect
9192
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
9293
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect
9394
github.com/aws/aws-sdk-go-v2/service/organizations v1.27.3 // indirect
9495
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6 // indirect
9596
github.com/aws/aws-sdk-go-v2/service/servicequotas v1.21.4 // indirect
97+
github.com/aws/aws-sdk-go-v2/service/sqs v1.38.8
9698
github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect
9799
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
98100
github.com/aymerick/douceur v0.2.0 // indirect

0 commit comments

Comments
 (0)