Skip to content

Commit 4f920dd

Browse files
pierDipimatzew
andauthored
Fix KEDA scaling with SASL auth for brokers (#4253)
* Fix KEDA scaling with SASL auth for brokers TriggerAuthentication is misconfigured when Broker is configured to connect to Kafka with SASL since it was using only the legacy secret format. Signed-off-by: Pierangelo Di Pilato <[email protected]> * Adding nil check secret's data Signed-off-by: Matthias Wessendorf <[email protected]> --------- Signed-off-by: Pierangelo Di Pilato <[email protected]> Signed-off-by: Matthias Wessendorf <[email protected]> Co-authored-by: Matthias Wessendorf <[email protected]>
1 parent 9c04ba4 commit 4f920dd

File tree

3 files changed

+170
-37
lines changed

3 files changed

+170
-37
lines changed

control-plane/pkg/autoscaler/keda/keda.go

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import (
2121
"fmt"
2222
"strconv"
2323

24+
"github.com/IBM/sarama"
2425
corev1 "k8s.io/api/core/v1"
2526
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26-
"k8s.io/utils/pointer"
2727
"knative.dev/pkg/kmeta"
2828
"knative.dev/pkg/logging"
2929

3030
bindings "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1"
3131
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
32+
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internalskafkaeventing"
3233
"knative.dev/eventing-kafka-broker/third_party/pkg/client/clientset/versioned"
3334

3435
"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler"
@@ -42,6 +43,9 @@ import (
4243
const (
4344
// AutoscalerClass is the KEDA autoscaler class.
4445
AutoscalerClass = "keda.autoscaling.knative.dev"
46+
47+
KedaResourceLabel = internalskafkaeventing.GroupName + "/resource"
48+
KedaResourceLabelValue = "true"
4549
)
4650

4751
func GenerateScaleTarget(cg *kafkainternals.ConsumerGroup) *kedav1alpha1.ScaleTarget {
@@ -94,8 +98,10 @@ func GenerateScaleTriggers(cg *kafkainternals.ConsumerGroup, triggerAuthenticati
9498
}
9599

96100
func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData map[string][]byte) (*kedav1alpha1.TriggerAuthentication, *corev1.Secret, error) {
97-
98-
secretTargetRefs := make([]kedav1alpha1.AuthSecretTargetRef, 0, 8)
101+
// Make sure secretData is never nil
102+
if secretData == nil {
103+
secretData = make(map[string][]byte)
104+
}
99105

100106
secret := corev1.Secret{
101107
ObjectMeta: metav1.ObjectMeta{
@@ -104,28 +110,38 @@ func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData
104110
OwnerReferences: []metav1.OwnerReference{
105111
*kmeta.NewControllerRef(cg),
106112
},
113+
Labels: map[string]string{
114+
KedaResourceLabel: KedaResourceLabelValue,
115+
},
107116
},
108117
Data: secretData,
109118
StringData: make(map[string]string),
110119
}
111120

112-
saslType := retrieveSaslTypeIfPresent(cg, secret)
121+
opt, err := security.NewSaramaSecurityOptionFromSecret(&secret)
122+
if err != nil {
123+
return nil, nil, fmt.Errorf("failed to get security option from secret: %w", err)
124+
}
113125

114-
if saslType != nil {
115-
switch *saslType {
116-
case "SCRAM-SHA-256":
126+
cfg := &sarama.Config{}
127+
if err := opt(cfg); err != nil {
128+
return nil, nil, fmt.Errorf("failed to get SASL config from secret: %w", err)
129+
}
130+
131+
if cfg.Net.SASL.Enable {
132+
switch cfg.Net.SASL.Mechanism {
133+
case sarama.SASLTypePlaintext:
134+
secret.StringData["sasl"] = "plaintext"
135+
case sarama.SASLTypeSCRAMSHA256:
117136
secret.StringData["sasl"] = "scram_sha256"
118-
case "SCRAM-SHA-512":
137+
case sarama.SASLTypeSCRAMSHA512:
119138
secret.StringData["sasl"] = "scram_sha512"
120-
case "PLAIN":
121-
secret.StringData["sasl"] = "plaintext"
122139
default:
123-
return nil, nil, fmt.Errorf("SASL type value %q is not supported", *saslType)
140+
return nil, nil, fmt.Errorf("SASL type value %q is not supported", cfg.Net.SASL.Mechanism)
124141
}
125-
} else {
126-
secret.StringData["sasl"] = "plaintext" //default
127142
}
128143

144+
secretTargetRefs := make([]kedav1alpha1.AuthSecretTargetRef, 0, 8)
129145
triggerAuth := &kedav1alpha1.TriggerAuthentication{
130146
ObjectMeta: metav1.ObjectMeta{
131147
Name: cg.Name,
@@ -134,7 +150,7 @@ func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData
134150
*kmeta.NewControllerRef(cg),
135151
},
136152
Labels: map[string]string{
137-
//TODO: may need to add labels like eventing-autoscaler-keda/pkg/reconciler/broker/resources/triggerauthentication.go#L39-L40
153+
KedaResourceLabel: KedaResourceLabelValue,
138154
},
139155
},
140156
Spec: kedav1alpha1.TriggerAuthenticationSpec{
@@ -169,7 +185,7 @@ func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData
169185

170186
if cg.Spec.Template.Spec.Auth.SecretSpec != nil && cg.Spec.Template.Spec.Auth.SecretSpec.Ref.Name != "" {
171187

172-
if saslType != nil { //SASL enabled
188+
if cfg.Net.SASL.Enable {
173189
sasl := kedav1alpha1.AuthSecretTargetRef{Parameter: "sasl", Name: secret.Name, Key: "sasl"}
174190
secretTargetRefs = append(secretTargetRefs, sasl)
175191

@@ -206,22 +222,6 @@ func GenerateTriggerAuthentication(cg *kafkainternals.ConsumerGroup, secretData
206222
return triggerAuth, &secret, nil
207223
}
208224

209-
func retrieveSaslTypeIfPresent(cg *kafkainternals.ConsumerGroup, secret corev1.Secret) *string {
210-
if cg.Spec.Template.Spec.Auth.NetSpec != nil && cg.Spec.Template.Spec.Auth.NetSpec.SASL.Enable && cg.Spec.Template.Spec.Auth.NetSpec.SASL.Type.SecretKeyRef != nil {
211-
secretKeyRefKey := cg.Spec.Template.Spec.Auth.NetSpec.SASL.Type.SecretKeyRef.Key
212-
if saslTypeValue, ok := secret.Data[secretKeyRefKey]; ok {
213-
return pointer.String(string(saslTypeValue))
214-
}
215-
}
216-
217-
if cg.Spec.Template.Spec.Auth.SecretSpec != nil && cg.Spec.Template.Spec.Auth.SecretSpec.Ref != nil {
218-
if saslTypeValue, ok := secret.Data[security.SaslTypeLegacy]; ok {
219-
return pointer.String(string(saslTypeValue))
220-
}
221-
}
222-
return nil
223-
}
224-
225225
func addAuthSecretTargetRef(parameter string, secretKeyRef bindings.SecretValueFromSource, secretTargetRefs []kedav1alpha1.AuthSecretTargetRef) []kedav1alpha1.AuthSecretTargetRef {
226226
if secretKeyRef.SecretKeyRef == nil || secretKeyRef.SecretKeyRef.Name == "" || secretKeyRef.SecretKeyRef.Key == "" {
227227
return secretTargetRefs

control-plane/pkg/security/secrets_provider_legacy_channel_secret.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,18 @@ func ResolveAuthContextFromLegacySecret(s *corev1.Secret) (*NetSpecAuthContext,
3838
protocolStr, protocolContract := getProtocolFromLegacyChannelSecret(s)
3939

4040
virtualSecret := s.DeepCopy()
41+
4142
virtualSecret.Data[ProtocolKey] = []byte(protocolStr)
42-
if v, ok := virtualSecret.Data["sasltype"]; ok {
43+
if v, ok := virtualSecret.Data[SaslType]; ok {
4344
virtualSecret.Data[SaslMechanismKey] = v
4445
}
45-
if v, ok := virtualSecret.Data["saslType"]; ok {
46+
if v, ok := virtualSecret.Data[SaslTypeLegacy]; ok {
4647
virtualSecret.Data[SaslMechanismKey] = v
4748
}
48-
if v, ok := virtualSecret.Data["username"]; ok {
49+
if v, ok := virtualSecret.Data[SaslUsernameKey]; ok {
4950
virtualSecret.Data[SaslUserKey] = v
5051
}
51-
if v, ok := virtualSecret.Data["user"]; ok {
52+
if v, ok := virtualSecret.Data[SaslUserKey]; ok {
5253
virtualSecret.Data[SaslUserKey] = v
5354
}
5455

test/rekt/features/keda_scaling.go

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222

2323
"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler/keda"
2424
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
25+
brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
26+
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkaauthsecret"
2527

2628
"knative.dev/eventing/test/rekt/resources/trigger"
2729

@@ -55,7 +57,7 @@ import (
5557
)
5658

5759
func KafkaSourceScaledObjectHasNoEmptyAuthRef() *feature.Feature {
58-
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")
60+
f := feature.NewFeature()
5961

6062
// we need to ensure that autoscaling is enabled for the rest of the feature to work
6163
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
@@ -93,7 +95,7 @@ func KafkaSourceScaledObjectHasNoEmptyAuthRef() *feature.Feature {
9395
}
9496

9597
func KafkaSourceScalesToZeroWithKeda() *feature.Feature {
96-
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")
98+
f := feature.NewFeature()
9799

98100
// we need to ensure that autoscaling is enabled for the rest of the feature to work
99101
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
@@ -142,6 +144,44 @@ func KafkaSourceScalesToZeroWithKeda() *feature.Feature {
142144
return f
143145
}
144146

147+
func KafkaSourceSASLScalesToZeroWithKeda() *feature.Feature {
148+
f := feature.NewFeature()
149+
150+
// we need to ensure that autoscaling is enabled for the rest of the feature to work
151+
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
152+
153+
sourceCfg := kafkaSourceConfig{
154+
sourceName: feature.MakeRandomK8sName("kafka-source"),
155+
authMech: SASLMech,
156+
topic: feature.MakeRandomK8sName("kafka-source-keda-sasl"),
157+
}
158+
sinkCfg := kafkaSinkConfig{
159+
sinkName: feature.MakeRandomK8sName("kafka-sink"),
160+
}
161+
sinkName, receiver := KafkaSourceFeatureSetup(f, sourceCfg, sinkCfg)
162+
163+
sender := feature.MakeRandomK8sName("eventshub-sender")
164+
165+
event := cetest.FullEvent()
166+
event.SetID(uuid.New().String())
167+
168+
// check that the source initially has replicas = 0
169+
f.Setup("Source should start with replicas = 0", verifyConsumerGroupReplicas(getKafkaSourceCg(sourceCfg.sourceName), 0, true))
170+
171+
options := []eventshub.EventsHubOption{
172+
eventshub.StartSenderToResource(kafkasink.GVR(), sinkName),
173+
eventshub.InputEvent(event),
174+
}
175+
f.Requirement("install eventshub sender", eventshub.Install(sender, options...))
176+
177+
f.Requirement("eventshub receiver gets event", assert.OnStore(receiver).MatchEvent(test.HasId(event.ID())).Exact(1))
178+
179+
// after the event is sent, the source should scale down to zero replicas
180+
f.Alpha("KafkaSource").Must("Scale down to zero", verifyConsumerGroupReplicas(getKafkaSourceCg(sourceCfg.sourceName), 0, false))
181+
182+
return f
183+
}
184+
145185
func TriggerScalesToZeroWithKeda() *feature.Feature {
146186
f := feature.NewFeature()
147187

@@ -171,6 +211,98 @@ func TriggerScalesToZeroWithKeda() *feature.Feature {
171211
return f
172212
}
173213

214+
func TriggerSASLScalesToZeroWithKeda() *feature.Feature {
215+
f := feature.NewFeature()
216+
217+
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
218+
219+
event := cetest.FullEvent()
220+
221+
brokerName := feature.MakeRandomK8sName("broker")
222+
triggerName := feature.MakeRandomK8sName("trigger")
223+
sourceName := feature.MakeRandomK8sName("source")
224+
sinkName := feature.MakeRandomK8sName("sink")
225+
brokerConfigName := feature.MakeRandomK8sName("brokercfg")
226+
authSecretName := feature.MakeRandomK8sName("kafkaauth")
227+
228+
// check that the trigger initially has replicas = 0
229+
f.Setup("Trigger should start with replicas = 0", verifyConsumerGroupReplicas(getTriggerCg(triggerName), 0, true))
230+
231+
f.Setup("Create auth secret", func(ctx context.Context, t feature.T) {
232+
kafkaauthsecret.Install(authSecretName, kafkaauthsecret.WithSslSaslScram512Data(ctx))(ctx, t)
233+
})
234+
235+
f.Setup("Create broker config", brokerconfigmap.Install(brokerConfigName,
236+
brokerconfigmap.WithNumPartitions(3),
237+
brokerconfigmap.WithReplicationFactor(3),
238+
brokerconfigmap.WithBootstrapServer(testpkg.BootstrapServersSslSaslScram),
239+
brokerconfigmap.WithAuthSecret(authSecretName)))
240+
241+
f.Setup("Install broker", broker.Install(brokerName, append(
242+
broker.WithEnvConfig(),
243+
broker.WithConfig(brokerConfigName))...,
244+
))
245+
246+
f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver))
247+
f.Setup("install broker", broker.Install(brokerName))
248+
f.Setup("install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sinkName), "")))
249+
250+
f.Requirement("install source", eventshub.Install(sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEvent(event)))
251+
252+
f.Requirement("sink receives event", assert.OnStore(sinkName).MatchEvent(test.HasId(event.ID())).Exact(1))
253+
254+
//after the event is sent, the trigger should scale down to zero replicas
255+
f.Alpha("Trigger").Must("Scale down to zero", verifyConsumerGroupReplicas(getTriggerCg(triggerName), 0, false))
256+
257+
return f
258+
}
259+
260+
func TriggerSSLScalesToZeroWithKeda() *feature.Feature {
261+
f := feature.NewFeature()
262+
263+
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())
264+
265+
event := cetest.FullEvent()
266+
267+
brokerName := feature.MakeRandomK8sName("broker")
268+
triggerName := feature.MakeRandomK8sName("trigger")
269+
sourceName := feature.MakeRandomK8sName("source")
270+
sinkName := feature.MakeRandomK8sName("sink")
271+
brokerConfigName := feature.MakeRandomK8sName("brokercfg")
272+
authSecretName := feature.MakeRandomK8sName("kafkaauth")
273+
274+
// check that the trigger initially has replicas = 0
275+
f.Setup("Trigger should start with replicas = 0", verifyConsumerGroupReplicas(getTriggerCg(triggerName), 0, true))
276+
277+
f.Setup("Create auth secret", func(ctx context.Context, t feature.T) {
278+
kafkaauthsecret.Install(authSecretName, kafkaauthsecret.WithSslData(ctx))(ctx, t)
279+
})
280+
281+
f.Setup("Create broker config", brokerconfigmap.Install(brokerConfigName,
282+
brokerconfigmap.WithNumPartitions(3),
283+
brokerconfigmap.WithReplicationFactor(3),
284+
brokerconfigmap.WithBootstrapServer(testpkg.BootstrapServersSsl),
285+
brokerconfigmap.WithAuthSecret(authSecretName)))
286+
287+
f.Setup("Install broker", broker.Install(brokerName, append(
288+
broker.WithEnvConfig(),
289+
broker.WithConfig(brokerConfigName))...,
290+
))
291+
292+
f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver))
293+
f.Setup("install broker", broker.Install(brokerName))
294+
f.Setup("install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithSubscriber(service.AsKReference(sinkName), "")))
295+
296+
f.Requirement("install source", eventshub.Install(sourceName, eventshub.StartSenderToResource(broker.GVR(), brokerName), eventshub.InputEvent(event)))
297+
298+
f.Requirement("sink receives event", assert.OnStore(sinkName).MatchEvent(test.HasId(event.ID())).Exact(1))
299+
300+
//after the event is sent, the trigger should scale down to zero replicas
301+
f.Alpha("Trigger").Must("Scale down to zero", verifyConsumerGroupReplicas(getTriggerCg(triggerName), 0, false))
302+
303+
return f
304+
}
305+
174306
func ChannelScalesToZeroWithKeda() *feature.Feature {
175307
f := feature.NewFeature()
176308

0 commit comments

Comments
 (0)