Skip to content

Commit 9e7f1dc

Browse files
authored
Merge branch 'knative-extensions:main' into chore/set-idle-timeout-unit-milliseconds
2 parents 7ab9f27 + 1371423 commit 9e7f1dc

File tree

66 files changed

+19294
-18066
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+19294
-18066
lines changed

control-plane/config/eventing-kafka-broker/200-controller/100-config-tracing.yaml

Lines changed: 0 additions & 59 deletions
This file was deleted.

control-plane/config/post-install/200-controller-cluster-role.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ metadata:
2020
name: knative-kafka-controller-post-install
2121
labels:
2222
app.kubernetes.io/version: devel
23-
rules: []
23+
rules: null

control-plane/pkg/apis/internalskafkaeventing/v1alpha1/consumer_group_lifecycle.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const (
2626
ConditionConsumerGroupConsumers apis.ConditionType = "Consumers"
2727
ConditionConsumerGroupConsumersScheduled apis.ConditionType = "ConsumersScheduled"
2828
ConditionAutoscaling apis.ConditionType = "Autoscaler"
29+
AutoscalerDisabled = "AutoscalerDisabled"
2930
// Labels
3031
KafkaChannelNameLabel = "kafkachannel-name"
3132
ConsumerLabelSelector = "kafka.eventing.knative.dev/metadata.uid"
@@ -98,7 +99,7 @@ func (cg *ConsumerGroup) MarkAutoscalerSucceeded() {
9899
}
99100

100101
func (cg *ConsumerGroup) MarkAutoscalerDisabled() {
101-
cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrueWithReason(ConditionAutoscaling, "Autoscaler is disabled", "")
102+
cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrueWithReason(ConditionAutoscaling, AutoscalerDisabled, "")
102103
}
103104

104105
func (cg *ConsumerGroup) MarkAutoscalerFailed(reason string, err error) error {

control-plane/pkg/apis/internalskafkaeventing/v1alpha1/consumer_group_types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,18 @@ func (cg *ConsumerGroup) IsNotScheduled() bool {
232232
return cond.IsFalse() || cond.IsUnknown()
233233
}
234234

235+
func (cg *ConsumerGroup) IsAutoscalerNotDisabled() bool {
236+
condition := cg.Status.GetCondition(ConditionAutoscaling)
237+
238+
// If condition doesn't exist or is not true, it's not in "disabled" state
239+
if condition == nil || condition.Status != corev1.ConditionTrue {
240+
return true
241+
}
242+
243+
// If condition is True but reason is not "autoscaler is disabled", then it's actively enabled
244+
return condition.Reason != AutoscalerDisabled
245+
}
246+
235247
func (cg *ConsumerGroup) HasDeadLetterSink() bool {
236248
return hasDeadLetterSink(cg.Spec.Template.Spec.Delivery)
237249
}

control-plane/pkg/kafka/clientpool/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ func (c *client) Topics() ([]string, error) {
7979
return x, err
8080
}
8181

82+
func (c *client) PartitionNotReadable(topic string, partition int32) bool {
83+
return c.client.PartitionNotReadable(topic, partition)
84+
}
85+
8286
func (c *client) Partitions(topic string) ([]int32, error) {
8387
x, err := c.client.Partitions(topic)
8488
if c.isFatalError(err) {

control-plane/pkg/kafka/clientpool/clusteradmin.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ type clusterAdmin struct {
3535
onFatalError func(err error)
3636
}
3737

38+
func (a *clusterAdmin) Coordinator(group string) (*sarama.Broker, error) {
39+
//TODO implement me
40+
panic("implement me")
41+
}
42+
3843
func clusterAdminFromClient(saramaClient sarama.Client, makeClusterAdmin kafka.NewClusterAdminFromClientFunc) (*clusterAdmin, error) {
3944
c, ok := saramaClient.(*client)
4045
if !ok {
@@ -64,6 +69,14 @@ func (a *clusterAdmin) CreateTopic(topic string, detail *sarama.TopicDetail, val
6469
return err
6570
}
6671

72+
func (a *clusterAdmin) ElectLeaders(electionType sarama.ElectionType, m map[string][]int32) (map[string]map[int32]*sarama.PartitionResult, error) {
73+
x, err := a.clusterAdmin.ElectLeaders(electionType, m)
74+
if a.isFatalError(err) {
75+
a.onFatalError(err)
76+
}
77+
return x, err
78+
}
79+
6780
func (a *clusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error) {
6881
x, err := a.clusterAdmin.ListTopics()
6982
if a.isFatalError(err) {

control-plane/pkg/kafka/consumer_group_lag_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,14 @@ type saramaClientMock struct {
212212
closed bool
213213
}
214214

215+
func (s *saramaClientMock) ElectLeaders(electionType sarama.ElectionType, m map[string][]int32) (map[string]map[int32]*sarama.PartitionResult, error) {
216+
panic("implement me")
217+
}
218+
219+
func (s *saramaClientMock) PartitionNotReadable(topic string, partition int32) bool {
220+
panic("implement me")
221+
}
222+
215223
func (s *saramaClientMock) CreateACLs(acls []*sarama.ResourceAcls) error {
216224
panic("implement me")
217225
}

control-plane/pkg/kafka/testing/admin_mock.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,20 @@ type MockKafkaClusterAdmin struct {
6363
T *testing.T
6464
}
6565

66+
func (m *MockKafkaClusterAdmin) ElectLeaders(electionType sarama.ElectionType, m2 map[string][]int32) (map[string]map[int32]*sarama.PartitionResult, error) {
67+
if m.ErrorBrokenPipe {
68+
return nil, brokenPipeError{}
69+
}
70+
panic("implement me")
71+
}
72+
73+
func (m *MockKafkaClusterAdmin) Coordinator(group string) (*sarama.Broker, error) {
74+
if m.ErrorBrokenPipe {
75+
return nil, brokenPipeError{}
76+
}
77+
panic("implement me")
78+
}
79+
6680
func (m *MockKafkaClusterAdmin) CreateACLs(acls []*sarama.ResourceAcls) error {
6781
if m.ErrorBrokenPipe {
6882
return brokenPipeError{}

control-plane/pkg/kafka/testing/client_mock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func (m MockKafkaClient) Brokers() []*sarama.Broker {
6262
panic("implement me")
6363
}
6464

65+
func (m MockKafkaClient) PartitionNotReadable(topic string, partition int32) bool {
66+
panic("implement me")
67+
}
68+
6569
func (m MockKafkaClient) Broker(brokerID int32) (*sarama.Broker, error) {
6670
if m.ShouldFailBrokenPipe {
6771
return nil, brokenPipeError{}

control-plane/pkg/reconciler/consumergroup/consumergroup.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,18 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.Consu
211211
}
212212
cg.MarkAutoscalerSucceeded()
213213
} else {
214-
// If KEDA is not installed or autoscaler feature disabled, do nothing
215-
cg.MarkAutoscalerDisabled()
216-
if err := r.deleteKedaObjects(ctx, cg); err != nil {
217-
return err
214+
215+
// If KEDA autoscaler feature is disabled, we need to check if it was enabled before
216+
// check if the conditoion is not yet AutoscalerDisabled, only than delete KEDA objects
217+
if cg.IsAutoscalerNotDisabled() {
218+
logger.Debugw("Deleting KEDA objects as autoscaler is being disabled")
219+
if err := r.deleteKedaObjects(ctx, cg); err != nil {
220+
return err
221+
}
218222
}
223+
224+
// Mark as disabled after successful deletion, or if the feature was already disabled
225+
cg.MarkAutoscalerDisabled()
219226
}
220227

221228
logger.Debugw("Reconciling consumers")

0 commit comments

Comments
 (0)