Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ func main() {
// Open a WebSocket connection to the autoscaler.
autoscalerEndpoint := "ws://" + pkgnet.GetServiceHostname("autoscaler", system.Namespace()) + autoscalerPort
logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint)
statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)
statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger,
activator.AutoscalerConnectionOptions(logger, mp)...)
defer statSink.Shutdown()
go activator.ReportStats(logger, statSink, statCh)

Expand Down
63 changes: 63 additions & 0 deletions pkg/activator/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package activator

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

var scopeName = "knative.dev/serving/pkg/activator"

type statReporterMetrics struct {
autoscalerReachable metric.Int64Gauge
autoscalerConnectionErrors metric.Int64Counter
}

func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics {
var (
m statReporterMetrics
err error
provider = mp
)

if provider == nil {
provider = otel.GetMeterProvider()
}

meter := provider.Meter(scopeName)

m.autoscalerReachable, err = meter.Int64Gauge(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use counters with labels here instead of a gauge. If the connection is flaky we might get the case where we always check when the gauge is 1.
If we have a counter with result=success or result=error we would:

  • not miss any errors anymore
  • could create an alert based on the success rate e.g. during the last 5 minutes if success rate is e.g. below 95%

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question - @prashanthjos do you have an opinion here

I'm wondering if adding a second metric that tracks reconnects is sufficient to answer @linkvt's question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I've implemented a hybrid approach:

Keep the gauge (kn.activator.autoscaler.reachable), provides instant visibility into current connection state
Add a counter (kn.activator.autoscaler.connection_errors_total), monotonically increasing, tracks every error
This addresses @linkvt's concern:

Counter never misses errors (accumulates, doesn't sample)
Enables rate-based alerting: rate(connection_errors_total[5m]) > threshold
Gauge still answers "is it reachable right now?" for real-time dashboards
The counter increments on every error event (both periodic health checks and send failures), so flaky connections will be fully captured.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the argument for keeping the gauge, sounds good to me 👍

With regards to the error counter, I did more research and found the following evaluation that gives a recommendation and also doesn't recommend the approach I proposed: https://promlabs.com/blog/2023/09/19/errors-successes-totals-which-metrics-should-i-expose-to-prometheus/#recommended-for-binary-outcomes-exposing-errors-and-totals

In summary:

  • Add a counter for total connection checks
  • Keep the counter for connection checks with errors (exists right now)
  • This allows you to track the absolute error rate but also the relative error ratio which is in my experience more important than an absolute number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "errors + totals" pattern makes sense for variable-rate operations like HTTP requests.

However, for this connection health check, I'm not sure the totals counter adds value because:

  • Check frequency is fixed (every 5s) - the rate is predictable/constant
  • Any error is actionable, we don't need a ratio to decide if the connection is healthy
  • The error counter alone lets us alert on rate(connection_errors_total[5m]) > 0, which captures "autoscaler became unreachable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially agree and also thought about that but:

  • there can be multiple activator pods so there is a difference if you have 1 error with 1 or 10 activators
  • there are outages/hickups in the environment on every level if your environment is sufficiently large, it's just a statistical certainty with network partitions, disk errors, memory errors, etc. happening at some point in time. Not every error is actionable as errors are expected in large environments, this is also why error budgets are common.
  • following common patterns (error + total metric) makes it easier for infra teams to write their alerts based on error budget burn rate.

This is at least my experience from working in some large environments handling PBs of data and working on the German railroads sales platform to give my statements some credibility 🙂 .

Copy link
Contributor Author

@prashanthjos prashanthjos Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linkvt thank you for the detailed feedback and context, this is really helpful! Your points about multiple activator pods and error budgets in large environments make a lot of sense. Following the error + total pattern for burn rate alerting is definitely the right approach for production observability.

Could I accommodate this in a follow-up PR? I'd like to get the current changes merged first and then add the connection_checks_total counter alongside the existing connection_errors_total in a subsequent PR.

"kn.activator.autoscaler.reachable",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge deal in my mind but curious if we should make this generic.

For example kn.activator.open_connections and have a peer attribute of autoscaler

Thus if the activator were to ever connect to anything else we would just add a new peer attribute.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And likewise tweak the metric name below

metric.WithDescription("Whether the autoscaler is reachable from the activator (1 = reachable, 0 = not reachable)"),
metric.WithUnit("{reachable}"),
)
if err != nil {
panic(err)
}

m.autoscalerConnectionErrors, err = meter.Int64Counter(
"kn.activator.autoscaler.connection_errors_total",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to OTel documentation total shouldn't be in the name

https://opentelemetry.io/docs/specs/semconv/general/naming/#do-not-use-total

metric.WithDescription("Total number of autoscaler connection errors from the activator"),
metric.WithUnit("{error}"),
)
if err != nil {
panic(err)
}

return &m
}
34 changes: 29 additions & 5 deletions pkg/activator/stat_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package activator

import (
"context"

"github.com/gorilla/websocket"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"knative.dev/serving/pkg/autoscaler/metrics"
pkgwebsocket "knative.dev/pkg/websocket"
asmetrics "knative.dev/serving/pkg/autoscaler/metrics"
)

// RawSender sends raw byte array messages with a message type
Expand All @@ -28,21 +32,41 @@ type RawSender interface {
SendRaw(msgType int, msg []byte) error
}

// AutoscalerConnectionOptions returns websocket connection options that handle
// connection status changes via callbacks. This enables real-time metric updates
// when the connection state changes, without polling.
func AutoscalerConnectionOptions(logger *zap.SugaredLogger, mp metric.MeterProvider) []pkgwebsocket.ConnectionOption {
metrics := newStatReporterMetrics(mp)
return []pkgwebsocket.ConnectionOption{
pkgwebsocket.WithOnConnect(func() {
logger.Info("Autoscaler connection established")
metrics.autoscalerReachable.Record(context.Background(), 1)
}),
pkgwebsocket.WithOnDisconnect(func(err error) {
logger.Errorw("Autoscaler connection lost", zap.Error(err))
metrics.autoscalerReachable.Record(context.Background(), 0)
metrics.autoscalerConnectionErrors.Add(context.Background(), 1)
}),
}
}

// ReportStats sends any messages received on the source channel to the sink.
// The messages are sent on a goroutine to avoid blocking, which means that
// messages may arrive out of order.
func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metrics.StatMessage) {
func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asmetrics.StatMessage) {
for sms := range source {
go func(sms []metrics.StatMessage) {
wsms := metrics.ToWireStatMessages(sms)
go func(sms []asmetrics.StatMessage) {
wsms := asmetrics.ToWireStatMessages(sms)
b, err := wsms.Marshal()
if err != nil {
logger.Errorw("Error while marshaling stats", zap.Error(err))
return
}

if err := sink.SendRaw(websocket.BinaryMessage, b); err != nil {
logger.Errorw("Error while sending stats", zap.Error(err))
logger.Errorw("Autoscaler is not reachable from activator. Stats were not sent.",
zap.Error(err),
zap.Int("stat_message_count", len(sms)))
}
}(sms)
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/activator/stat_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package activator

import (
"errors"
"testing"
"time"

Expand Down Expand Up @@ -95,3 +96,41 @@ type sendRawFunc func(msgType int, msg []byte) error
func (fn sendRawFunc) SendRaw(msgType int, msg []byte) error {
return fn(msgType, msg)
}

func TestReportStatsSendFailure(t *testing.T) {
logger := logtesting.TestLogger(t)
ch := make(chan []metrics.StatMessage)

sendErr := errors.New("connection refused")
errorReceived := make(chan struct{})
sink := sendRawFunc(func(msgType int, msg []byte) error {
close(errorReceived)
return sendErr
})

defer close(ch)
go ReportStats(logger, sink, ch)

// Send a stat message
ch <- []metrics.StatMessage{{
Key: types.NamespacedName{Name: "test-revision"},
}}

// Wait for the error to be processed
select {
case <-errorReceived:
// Success - the error path was executed
case <-time.After(2 * time.Second):
t.Fatal("SendRaw was not called within timeout")
}
}

func TestAutoscalerConnectionOptions(t *testing.T) {
logger := logtesting.TestLogger(t)

opts := AutoscalerConnectionOptions(logger, nil)

if len(opts) != 2 {
t.Errorf("Expected 2 connection options, got %d", len(opts))
}
}
Loading