From dce652db1cd16df2455b6349d8641b53b9e6c6f3 Mon Sep 17 00:00:00 2001 From: Prashanth Josyula Date: Tue, 23 Dec 2025 18:28:45 -0800 Subject: [PATCH 1/6] Add metric and logging for activator-autoscaler connectivity This change adds observability for the websocket connection between the activator and autoscaler components: - Add `activator_autoscaler_reachable` gauge metric (1=reachable, 0=not reachable) - Log ERROR when autoscaler is not reachable during stat sending - Add periodic connection status monitor (every 5s) to detect connection establishment failures - Add unit tests for the new AutoscalerConnectionStatusMonitor function The metric is recorded in two scenarios: 1. When SendRaw fails/succeeds during stat message sending 2. When the periodic status check detects connection not established This helps operators identify connectivity issues between activator and autoscaler that could impact autoscaling decisions. --- cmd/activator/main.go | 3 +- pkg/activator/metrics.go | 53 +++++++++++++++++++++++++++++ pkg/activator/stat_reporter.go | 53 ++++++++++++++++++++++++++--- pkg/activator/stat_reporter_test.go | 43 ++++++++++++++++++++++- 4 files changed, 145 insertions(+), 7 deletions(-) create mode 100644 pkg/activator/metrics.go diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 8593ccbed5dd..b6e2cd963e8c 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -205,7 +205,8 @@ func main() { logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint) statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger) defer statSink.Shutdown() - go activator.ReportStats(logger, statSink, statCh) + go activator.ReportStats(logger, statSink, statCh, mp) + go activator.AutoscalerConnectionStatusMonitor(ctx, logger, statSink, mp) // Create and run our concurrency reporter concurrencyReporter := activatorhandler.NewConcurrencyReporter(ctx, env.PodName, statCh, mp) diff --git a/pkg/activator/metrics.go b/pkg/activator/metrics.go new file mode 100644 index 000000000000..21959703a02c --- /dev/null +++ b/pkg/activator/metrics.go @@ -0,0 +1,53 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package activator + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +var scopeName = "knative.dev/serving/pkg/activator" + +type statReporterMetrics struct { + autoscalerReachable metric.Int64Gauge +} + +func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics { + var ( + m statReporterMetrics + err error + provider = mp + ) + + if provider == nil { + provider = otel.GetMeterProvider() + } + + meter := provider.Meter(scopeName) + + m.autoscalerReachable, err = meter.Int64Gauge( + "activator_autoscaler_reachable", + metric.WithDescription("Whether the autoscaler is reachable from the activator (1 = reachable, 0 = not reachable)"), + metric.WithUnit("{reachable}"), + ) + if err != nil { + panic(err) + } + + return &m +} diff --git a/pkg/activator/stat_reporter.go b/pkg/activator/stat_reporter.go index bfd60719c955..a44894f1c823 100644 --- a/pkg/activator/stat_reporter.go +++ b/pkg/activator/stat_reporter.go @@ -17,9 +17,18 @@ limitations under the License. package activator import ( + "context" + "time" + "github.com/gorilla/websocket" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" - "knative.dev/serving/pkg/autoscaler/metrics" + asmetrics "knative.dev/serving/pkg/autoscaler/metrics" +) + +const ( + // connectionCheckInterval is how often to check the autoscaler connection status. + connectionCheckInterval = 5 * time.Second ) // RawSender sends raw byte array messages with a message type @@ -28,13 +37,42 @@ type RawSender interface { SendRaw(msgType int, msg []byte) error } +// StatusChecker checks the connection status. +type StatusChecker interface { + Status() error +} + +// AutoscalerConnectionStatusMonitor periodically checks if the autoscaler is reachable +// and emits metrics and logs accordingly. +func AutoscalerConnectionStatusMonitor(ctx context.Context, logger *zap.SugaredLogger, conn StatusChecker, mp metric.MeterProvider) { + metrics := newStatReporterMetrics(mp) + ticker := time.NewTicker(connectionCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := conn.Status(); err != nil { + logger.Errorw("Autoscaler is not reachable from activator.", + zap.Error(err)) + metrics.autoscalerReachable.Record(context.Background(), 0) + } else { + metrics.autoscalerReachable.Record(context.Background(), 1) + } + } + } +} + // ReportStats sends any messages received on the source channel to the sink. // The messages are sent on a goroutine to avoid blocking, which means that // messages may arrive out of order. -func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metrics.StatMessage) { +func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asmetrics.StatMessage, mp metric.MeterProvider) { + metrics := newStatReporterMetrics(mp) for sms := range source { - go func(sms []metrics.StatMessage) { - wsms := metrics.ToWireStatMessages(sms) + go func(sms []asmetrics.StatMessage) { + wsms := asmetrics.ToWireStatMessages(sms) b, err := wsms.Marshal() if err != nil { logger.Errorw("Error while marshaling stats", zap.Error(err)) @@ -42,7 +80,12 @@ func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metr } if err := sink.SendRaw(websocket.BinaryMessage, b); err != nil { - logger.Errorw("Error while sending stats", zap.Error(err)) + logger.Errorw("Autoscaler is not reachable from activator. Stats were not sent.", + zap.Error(err), + zap.Int("stat_message_count", len(sms))) + metrics.autoscalerReachable.Record(context.Background(), 0) + } else { + metrics.autoscalerReachable.Record(context.Background(), 1) } }(sms) } diff --git a/pkg/activator/stat_reporter_test.go b/pkg/activator/stat_reporter_test.go index 785f28d1fa4b..8cec54f57f0b 100644 --- a/pkg/activator/stat_reporter_test.go +++ b/pkg/activator/stat_reporter_test.go @@ -17,6 +17,8 @@ limitations under the License. package activator import ( + "context" + "errors" "testing" "time" @@ -43,7 +45,7 @@ func TestReportStats(t *testing.T) { }) defer close(ch) - go ReportStats(logger, sink, ch) + go ReportStats(logger, sink, ch, nil) inputs := [][]metrics.StatMessage{{{ Key: types.NamespacedName{Name: "first-a"}, @@ -95,3 +97,42 @@ type sendRawFunc func(msgType int, msg []byte) error func (fn sendRawFunc) SendRaw(msgType int, msg []byte) error { return fn(msgType, msg) } + +type statusCheckerFunc func() error + +func (fn statusCheckerFunc) Status() error { + return fn() +} + +func TestAutoscalerConnectionStatusMonitor(t *testing.T) { + tests := []struct { + name string + statusErr error + }{{ + name: "connection established", + statusErr: nil, + }, { + name: "connection not established", + statusErr: errors.New("connection not established"), + }} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := logtesting.TestLogger(t) + ctx, cancel := context.WithCancel(context.Background()) + + checker := statusCheckerFunc(func() error { + return tt.statusErr + }) + + // Start the monitor + go AutoscalerConnectionStatusMonitor(ctx, logger, checker, nil) + + // Wait for at least one check to complete + time.Sleep(6 * time.Second) + + // Cancel context to stop the monitor + cancel() + }) + } +} From 1e54a4975d495b1f7c3ad418af49ab490cb8314d Mon Sep 17 00:00:00 2001 From: Prashanth Josyula Date: Tue, 30 Dec 2025 08:58:56 -0800 Subject: [PATCH 2/6] Correcting the metric name inline with knative standards --- pkg/activator/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/activator/metrics.go b/pkg/activator/metrics.go index 21959703a02c..3571462c752e 100644 --- a/pkg/activator/metrics.go +++ b/pkg/activator/metrics.go @@ -41,7 +41,7 @@ func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics { meter := provider.Meter(scopeName) m.autoscalerReachable, err = meter.Int64Gauge( - "activator_autoscaler_reachable", + "kn.activator.autoscaler.reachable", metric.WithDescription("Whether the autoscaler is reachable from the activator (1 = reachable, 0 = not reachable)"), metric.WithUnit("{reachable}"), ) From 0a216b591e0f6dcccfab77e41b336ee4226156fe Mon Sep 17 00:00:00 2001 From: Prashanth Josyula Date: Wed, 31 Dec 2025 16:43:40 -0800 Subject: [PATCH 3/6] Increasing the code coverage --- pkg/activator/stat_reporter_test.go | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pkg/activator/stat_reporter_test.go b/pkg/activator/stat_reporter_test.go index 8cec54f57f0b..d480686d80a1 100644 --- a/pkg/activator/stat_reporter_test.go +++ b/pkg/activator/stat_reporter_test.go @@ -104,6 +104,37 @@ func (fn statusCheckerFunc) Status() error { return fn() } +func TestReportStatsSendFailure(t *testing.T) { + logger := logtesting.TestLogger(t) + ch := make(chan []metrics.StatMessage) + + sendErr := errors.New("connection refused") + errorReceived := make(chan struct{}) + sink := sendRawFunc(func(msgType int, msg []byte) error { + close(errorReceived) + return sendErr + }) + + defer close(ch) + go ReportStats(logger, sink, ch, nil) + + // Send a stat message + ch <- []metrics.StatMessage{{ + Key: types.NamespacedName{Name: "test-revision"}, + }} + + // Wait for the error to be processed + select { + case <-errorReceived: + // Success - the error path was executed + case <-time.After(2 * time.Second): + t.Fatal("SendRaw was not called within timeout") + } + + // Give some time for the goroutine to process the error and log + time.Sleep(100 * time.Millisecond) +} + func TestAutoscalerConnectionStatusMonitor(t *testing.T) { tests := []struct { name string From 59d3df8a2befe68acb70ae6161bd1cd82dbeccb6 Mon Sep 17 00:00:00 2001 From: Prashanth Josyula Date: Mon, 12 Jan 2026 22:16:07 -0800 Subject: [PATCH 4/6] Addressing review comments --- pkg/activator/metrics.go | 12 +++++++++++- pkg/activator/stat_reporter.go | 2 ++ pkg/activator/stat_reporter_test.go | 3 --- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pkg/activator/metrics.go b/pkg/activator/metrics.go index 3571462c752e..ad1674b94143 100644 --- a/pkg/activator/metrics.go +++ b/pkg/activator/metrics.go @@ -24,7 +24,8 @@ import ( var scopeName = "knative.dev/serving/pkg/activator" type statReporterMetrics struct { - autoscalerReachable metric.Int64Gauge + autoscalerReachable metric.Int64Gauge + autoscalerConnectionErrors metric.Int64Counter } func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics { @@ -49,5 +50,14 @@ func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics { panic(err) } + m.autoscalerConnectionErrors, err = meter.Int64Counter( + "kn.activator.autoscaler.connection_errors_total", + metric.WithDescription("Total number of autoscaler connection errors from the activator"), + metric.WithUnit("{error}"), + ) + if err != nil { + panic(err) + } + return &m } diff --git a/pkg/activator/stat_reporter.go b/pkg/activator/stat_reporter.go index a44894f1c823..d59a6505e6ed 100644 --- a/pkg/activator/stat_reporter.go +++ b/pkg/activator/stat_reporter.go @@ -58,6 +58,7 @@ func AutoscalerConnectionStatusMonitor(ctx context.Context, logger *zap.SugaredL logger.Errorw("Autoscaler is not reachable from activator.", zap.Error(err)) metrics.autoscalerReachable.Record(context.Background(), 0) + metrics.autoscalerConnectionErrors.Add(context.Background(), 1) } else { metrics.autoscalerReachable.Record(context.Background(), 1) } @@ -84,6 +85,7 @@ func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asme zap.Error(err), zap.Int("stat_message_count", len(sms))) metrics.autoscalerReachable.Record(context.Background(), 0) + metrics.autoscalerConnectionErrors.Add(context.Background(), 1) } else { metrics.autoscalerReachable.Record(context.Background(), 1) } diff --git a/pkg/activator/stat_reporter_test.go b/pkg/activator/stat_reporter_test.go index d480686d80a1..23cb1e6aecaa 100644 --- a/pkg/activator/stat_reporter_test.go +++ b/pkg/activator/stat_reporter_test.go @@ -130,9 +130,6 @@ func TestReportStatsSendFailure(t *testing.T) { case <-time.After(2 * time.Second): t.Fatal("SendRaw was not called within timeout") } - - // Give some time for the goroutine to process the error and log - time.Sleep(100 * time.Millisecond) } func TestAutoscalerConnectionStatusMonitor(t *testing.T) { From 485a37af6ddeec197800fdab1b155d9dabac5b0d Mon Sep 17 00:00:00 2001 From: Prashanth Josyula Date: Wed, 14 Jan 2026 17:46:08 -0800 Subject: [PATCH 5/6] Integrating the onConnect and onDisconnect calls --- cmd/activator/main.go | 6 ++-- pkg/activator/stat_reporter.go | 53 +++++++++-------------------- pkg/activator/stat_reporter_test.go | 44 ++++-------------------- 3 files changed, 26 insertions(+), 77 deletions(-) diff --git a/cmd/activator/main.go b/cmd/activator/main.go index b6e2cd963e8c..9167ad389078 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -203,10 +203,10 @@ func main() { // Open a WebSocket connection to the autoscaler. autoscalerEndpoint := "ws://" + pkgnet.GetServiceHostname("autoscaler", system.Namespace()) + autoscalerPort logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint) - statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger) + statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger, + activator.AutoscalerConnectionOptions(logger, mp)...) defer statSink.Shutdown() - go activator.ReportStats(logger, statSink, statCh, mp) - go activator.AutoscalerConnectionStatusMonitor(ctx, logger, statSink, mp) + go activator.ReportStats(logger, statSink, statCh) // Create and run our concurrency reporter concurrencyReporter := activatorhandler.NewConcurrencyReporter(ctx, env.PodName, statCh, mp) diff --git a/pkg/activator/stat_reporter.go b/pkg/activator/stat_reporter.go index d59a6505e6ed..e337aa49ed7f 100644 --- a/pkg/activator/stat_reporter.go +++ b/pkg/activator/stat_reporter.go @@ -18,59 +18,42 @@ package activator import ( "context" - "time" "github.com/gorilla/websocket" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + pkgwebsocket "knative.dev/pkg/websocket" asmetrics "knative.dev/serving/pkg/autoscaler/metrics" ) -const ( - // connectionCheckInterval is how often to check the autoscaler connection status. - connectionCheckInterval = 5 * time.Second -) - // RawSender sends raw byte array messages with a message type // (implemented by gorilla/websocket.Socket). type RawSender interface { SendRaw(msgType int, msg []byte) error } -// StatusChecker checks the connection status. -type StatusChecker interface { - Status() error -} - -// AutoscalerConnectionStatusMonitor periodically checks if the autoscaler is reachable -// and emits metrics and logs accordingly. -func AutoscalerConnectionStatusMonitor(ctx context.Context, logger *zap.SugaredLogger, conn StatusChecker, mp metric.MeterProvider) { +// AutoscalerConnectionOptions returns websocket connection options that handle +// connection status changes via callbacks. This enables real-time metric updates +// when the connection state changes, without polling. +func AutoscalerConnectionOptions(logger *zap.SugaredLogger, mp metric.MeterProvider) []pkgwebsocket.ConnectionOption { metrics := newStatReporterMetrics(mp) - ticker := time.NewTicker(connectionCheckInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if err := conn.Status(); err != nil { - logger.Errorw("Autoscaler is not reachable from activator.", - zap.Error(err)) - metrics.autoscalerReachable.Record(context.Background(), 0) - metrics.autoscalerConnectionErrors.Add(context.Background(), 1) - } else { - metrics.autoscalerReachable.Record(context.Background(), 1) - } - } + return []pkgwebsocket.ConnectionOption{ + pkgwebsocket.WithOnConnect(func() { + logger.Info("Autoscaler connection established") + metrics.autoscalerReachable.Record(context.Background(), 1) + }), + pkgwebsocket.WithOnDisconnect(func(err error) { + logger.Errorw("Autoscaler connection lost", zap.Error(err)) + metrics.autoscalerReachable.Record(context.Background(), 0) + metrics.autoscalerConnectionErrors.Add(context.Background(), 1) + }), } } // ReportStats sends any messages received on the source channel to the sink. // The messages are sent on a goroutine to avoid blocking, which means that // messages may arrive out of order. -func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asmetrics.StatMessage, mp metric.MeterProvider) { - metrics := newStatReporterMetrics(mp) +func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asmetrics.StatMessage) { for sms := range source { go func(sms []asmetrics.StatMessage) { wsms := asmetrics.ToWireStatMessages(sms) @@ -84,10 +67,6 @@ func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asme logger.Errorw("Autoscaler is not reachable from activator. Stats were not sent.", zap.Error(err), zap.Int("stat_message_count", len(sms))) - metrics.autoscalerReachable.Record(context.Background(), 0) - metrics.autoscalerConnectionErrors.Add(context.Background(), 1) - } else { - metrics.autoscalerReachable.Record(context.Background(), 1) } }(sms) } diff --git a/pkg/activator/stat_reporter_test.go b/pkg/activator/stat_reporter_test.go index 23cb1e6aecaa..487fb0a007f6 100644 --- a/pkg/activator/stat_reporter_test.go +++ b/pkg/activator/stat_reporter_test.go @@ -17,7 +17,6 @@ limitations under the License. package activator import ( - "context" "errors" "testing" "time" @@ -45,7 +44,7 @@ func TestReportStats(t *testing.T) { }) defer close(ch) - go ReportStats(logger, sink, ch, nil) + go ReportStats(logger, sink, ch) inputs := [][]metrics.StatMessage{{{ Key: types.NamespacedName{Name: "first-a"}, @@ -98,12 +97,6 @@ func (fn sendRawFunc) SendRaw(msgType int, msg []byte) error { return fn(msgType, msg) } -type statusCheckerFunc func() error - -func (fn statusCheckerFunc) Status() error { - return fn() -} - func TestReportStatsSendFailure(t *testing.T) { logger := logtesting.TestLogger(t) ch := make(chan []metrics.StatMessage) @@ -116,7 +109,7 @@ func TestReportStatsSendFailure(t *testing.T) { }) defer close(ch) - go ReportStats(logger, sink, ch, nil) + go ReportStats(logger, sink, ch) // Send a stat message ch <- []metrics.StatMessage{{ @@ -132,35 +125,12 @@ func TestReportStatsSendFailure(t *testing.T) { } } -func TestAutoscalerConnectionStatusMonitor(t *testing.T) { - tests := []struct { - name string - statusErr error - }{{ - name: "connection established", - statusErr: nil, - }, { - name: "connection not established", - statusErr: errors.New("connection not established"), - }} - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - logger := logtesting.TestLogger(t) - ctx, cancel := context.WithCancel(context.Background()) - - checker := statusCheckerFunc(func() error { - return tt.statusErr - }) - - // Start the monitor - go AutoscalerConnectionStatusMonitor(ctx, logger, checker, nil) +func TestAutoscalerConnectionOptions(t *testing.T) { + logger := logtesting.TestLogger(t) - // Wait for at least one check to complete - time.Sleep(6 * time.Second) + opts := AutoscalerConnectionOptions(logger, nil) - // Cancel context to stop the monitor - cancel() - }) + if len(opts) != 2 { + t.Errorf("Expected 2 connection options, got %d", len(opts)) } } From b8ef1579a35bbf03bd32d2ba18fc5b27fe06bd0f Mon Sep 17 00:00:00 2001 From: Prashanth Josyula Date: Wed, 14 Jan 2026 21:32:28 -0800 Subject: [PATCH 6/6] Adding the peer attribute into the metric --- pkg/activator/metrics.go | 23 +++++++++++++++-------- pkg/activator/stat_reporter.go | 6 +++--- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/activator/metrics.go b/pkg/activator/metrics.go index ad1674b94143..802bd7c2c285 100644 --- a/pkg/activator/metrics.go +++ b/pkg/activator/metrics.go @@ -18,14 +18,21 @@ package activator import ( "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) var scopeName = "knative.dev/serving/pkg/activator" +// peerAttrKey is the attribute key for identifying the connection peer. +var peerAttrKey = attribute.Key("peer") + +// PeerAutoscaler is the attribute value for autoscaler connections. +var PeerAutoscaler = peerAttrKey.String("autoscaler") + type statReporterMetrics struct { - autoscalerReachable metric.Int64Gauge - autoscalerConnectionErrors metric.Int64Counter + reachable metric.Int64Gauge + connectionErrors metric.Int64Counter } func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics { @@ -41,18 +48,18 @@ func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics { meter := provider.Meter(scopeName) - m.autoscalerReachable, err = meter.Int64Gauge( - "kn.activator.autoscaler.reachable", - metric.WithDescription("Whether the autoscaler is reachable from the activator (1 = reachable, 0 = not reachable)"), + m.reachable, err = meter.Int64Gauge( + "kn.activator.reachable", + metric.WithDescription("Whether a peer is reachable from the activator (1 = reachable, 0 = not reachable)"), metric.WithUnit("{reachable}"), ) if err != nil { panic(err) } - m.autoscalerConnectionErrors, err = meter.Int64Counter( - "kn.activator.autoscaler.connection_errors_total", - metric.WithDescription("Total number of autoscaler connection errors from the activator"), + m.connectionErrors, err = meter.Int64Counter( + "kn.activator.connection_errors", + metric.WithDescription("Number of connection errors from the activator"), metric.WithUnit("{error}"), ) if err != nil { diff --git a/pkg/activator/stat_reporter.go b/pkg/activator/stat_reporter.go index e337aa49ed7f..1989bc89c53d 100644 --- a/pkg/activator/stat_reporter.go +++ b/pkg/activator/stat_reporter.go @@ -40,12 +40,12 @@ func AutoscalerConnectionOptions(logger *zap.SugaredLogger, mp metric.MeterProvi return []pkgwebsocket.ConnectionOption{ pkgwebsocket.WithOnConnect(func() { logger.Info("Autoscaler connection established") - metrics.autoscalerReachable.Record(context.Background(), 1) + metrics.reachable.Record(context.Background(), 1, metric.WithAttributes(PeerAutoscaler)) }), pkgwebsocket.WithOnDisconnect(func(err error) { logger.Errorw("Autoscaler connection lost", zap.Error(err)) - metrics.autoscalerReachable.Record(context.Background(), 0) - metrics.autoscalerConnectionErrors.Add(context.Background(), 1) + metrics.reachable.Record(context.Background(), 0, metric.WithAttributes(PeerAutoscaler)) + metrics.connectionErrors.Add(context.Background(), 1, metric.WithAttributes(PeerAutoscaler)) }), } }