Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func main() {
logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint)
statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)
defer statSink.Shutdown()
go activator.ReportStats(logger, statSink, statCh)
go activator.ReportStats(logger, statSink, statCh, mp)
go activator.AutoscalerConnectionStatusMonitor(ctx, logger, statSink, mp)

// Create and run our concurrency reporter
concurrencyReporter := activatorhandler.NewConcurrencyReporter(ctx, env.PodName, statCh, mp)
Expand Down
63 changes: 63 additions & 0 deletions pkg/activator/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package activator

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

var scopeName = "knative.dev/serving/pkg/activator"

type statReporterMetrics struct {
autoscalerReachable metric.Int64Gauge
autoscalerConnectionErrors metric.Int64Counter
}

func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics {
var (
m statReporterMetrics
err error
provider = mp
)

if provider == nil {
provider = otel.GetMeterProvider()
}

meter := provider.Meter(scopeName)

m.autoscalerReachable, err = meter.Int64Gauge(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use counters with labels here instead of a gauge. If the connection is flaky we might get the case where we always check when the gauge is 1.
If we have a counter with result=success or result=error we would:

  • not miss any errors anymore
  • could create an alert based on the success rate e.g. during the last 5 minutes if success rate is e.g. below 95%

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question - @prashanthjos do you have an opinion here

I'm wondering if adding a second metric that tracks reconnects is sufficient to answer @linkvt's question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I've implemented a hybrid approach:

Keep the gauge (kn.activator.autoscaler.reachable), provides instant visibility into current connection state
Add a counter (kn.activator.autoscaler.connection_errors_total), monotonically increasing, tracks every error
This addresses @linkvt's concern:

Counter never misses errors (accumulates, doesn't sample)
Enables rate-based alerting: rate(connection_errors_total[5m]) > threshold
Gauge still answers "is it reachable right now?" for real-time dashboards
The counter increments on every error event (both periodic health checks and send failures), so flaky connections will be fully captured.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the argument for keeping the gauge, sounds good to me 👍

With regards to the error counter, I did more research and found the following evaluation that gives a recommendation and also doesn't recommend the approach I proposed: https://promlabs.com/blog/2023/09/19/errors-successes-totals-which-metrics-should-i-expose-to-prometheus/#recommended-for-binary-outcomes-exposing-errors-and-totals

In summary:

  • Add a counter for total connection checks
  • Keep the counter for connection checks with errors (exists right now)
  • This allows you to track the absolute error rate but also the relative error ratio which is in my experience more important than an absolute number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "errors + totals" pattern makes sense for variable-rate operations like HTTP requests.

However, for this connection health check, I'm not sure the totals counter adds value because:

  • Check frequency is fixed (every 5s) - the rate is predictable/constant
  • Any error is actionable, we don't need a ratio to decide if the connection is healthy
  • The error counter alone lets us alert on rate(connection_errors_total[5m]) > 0, which captures "autoscaler became unreachable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially agree and also thought about that but:

  • there can be multiple activator pods so there is a difference if you have 1 error with 1 or 10 activators
  • there are outages/hickups in the environment on every level if your environment is sufficiently large, it's just a statistical certainty with network partitions, disk errors, memory errors, etc. happening at some point in time. Not every error is actionable as errors are expected in large environments, this is also why error budgets are common.
  • following common patterns (error + total metric) makes it easier for infra teams to write their alerts based on error budget burn rate.

This is at least my experience from working in some large environments handling PBs of data and working on the German railroads sales platform to give my statements some credibility 🙂 .

Copy link
Contributor Author

@prashanthjos prashanthjos Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linkvt thank you for the detailed feedback and context, this is really helpful! Your points about multiple activator pods and error budgets in large environments make a lot of sense. Following the error + total pattern for burn rate alerting is definitely the right approach for production observability.

Could I accommodate this in a follow-up PR? I'd like to get the current changes merged first and then add the connection_checks_total counter alongside the existing connection_errors_total in a subsequent PR.

"kn.activator.autoscaler.reachable",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge deal in my mind but curious if we should make this generic.

For example kn.activator.open_connections and have a peer attribute of autoscaler

Thus if the activator were to ever connect to anything else we would just add a new peer attribute.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And likewise tweak the metric name below

metric.WithDescription("Whether the autoscaler is reachable from the activator (1 = reachable, 0 = not reachable)"),
metric.WithUnit("{reachable}"),
)
if err != nil {
panic(err)
}

m.autoscalerConnectionErrors, err = meter.Int64Counter(
"kn.activator.autoscaler.connection_errors_total",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to OTel documentation total shouldn't be in the name

https://opentelemetry.io/docs/specs/semconv/general/naming/#do-not-use-total

metric.WithDescription("Total number of autoscaler connection errors from the activator"),
metric.WithUnit("{error}"),
)
if err != nil {
panic(err)
}

return &m
}
55 changes: 50 additions & 5 deletions pkg/activator/stat_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@ limitations under the License.
package activator

import (
"context"
"time"

"github.com/gorilla/websocket"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"knative.dev/serving/pkg/autoscaler/metrics"
asmetrics "knative.dev/serving/pkg/autoscaler/metrics"
)

const (
// connectionCheckInterval is how often to check the autoscaler connection status.
connectionCheckInterval = 5 * time.Second
)

// RawSender sends raw byte array messages with a message type
Expand All @@ -28,21 +37,57 @@ type RawSender interface {
SendRaw(msgType int, msg []byte) error
}

// StatusChecker checks the connection status.
type StatusChecker interface {
Status() error
}

// AutoscalerConnectionStatusMonitor periodically checks if the autoscaler is reachable
// and emits metrics and logs accordingly.
func AutoscalerConnectionStatusMonitor(ctx context.Context, logger *zap.SugaredLogger, conn StatusChecker, mp metric.MeterProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this monitor as the stats are already reported every second, see

const reportInterval = time.Second

This means errors would be detected there already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! You're right that errors are already detected every second via ReportStats when stats are being sent. However, the monitor handles one edge case: when there's no traffic. If no requests are coming in, ConcurrencyReporter sends nothing (len(msgs) == 0), so ReportStats never calls SendRaw(), and we wouldn't detect a broken connection.

metrics := newStatReporterMetrics(mp)
ticker := time.NewTicker(connectionCheckInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := conn.Status(); err != nil {
logger.Errorw("Autoscaler is not reachable from activator.",
zap.Error(err))
metrics.autoscalerReachable.Record(context.Background(), 0)
metrics.autoscalerConnectionErrors.Add(context.Background(), 1)
} else {
metrics.autoscalerReachable.Record(context.Background(), 1)
}
}
}
}

// ReportStats sends any messages received on the source channel to the sink.
// The messages are sent on a goroutine to avoid blocking, which means that
// messages may arrive out of order.
func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metrics.StatMessage) {
func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asmetrics.StatMessage, mp metric.MeterProvider) {
metrics := newStatReporterMetrics(mp)
for sms := range source {
go func(sms []metrics.StatMessage) {
wsms := metrics.ToWireStatMessages(sms)
go func(sms []asmetrics.StatMessage) {
wsms := asmetrics.ToWireStatMessages(sms)
b, err := wsms.Marshal()
if err != nil {
logger.Errorw("Error while marshaling stats", zap.Error(err))
return
}

if err := sink.SendRaw(websocket.BinaryMessage, b); err != nil {
logger.Errorw("Error while sending stats", zap.Error(err))
logger.Errorw("Autoscaler is not reachable from activator. Stats were not sent.",
zap.Error(err),
zap.Int("stat_message_count", len(sms)))
metrics.autoscalerReachable.Record(context.Background(), 0)
metrics.autoscalerConnectionErrors.Add(context.Background(), 1)
} else {
metrics.autoscalerReachable.Record(context.Background(), 1)
}
}(sms)
}
Expand Down
71 changes: 70 additions & 1 deletion pkg/activator/stat_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package activator

import (
"context"
"errors"
"testing"
"time"

Expand All @@ -43,7 +45,7 @@ func TestReportStats(t *testing.T) {
})

defer close(ch)
go ReportStats(logger, sink, ch)
go ReportStats(logger, sink, ch, nil)

inputs := [][]metrics.StatMessage{{{
Key: types.NamespacedName{Name: "first-a"},
Expand Down Expand Up @@ -95,3 +97,70 @@ type sendRawFunc func(msgType int, msg []byte) error
func (fn sendRawFunc) SendRaw(msgType int, msg []byte) error {
return fn(msgType, msg)
}

type statusCheckerFunc func() error

func (fn statusCheckerFunc) Status() error {
return fn()
}

func TestReportStatsSendFailure(t *testing.T) {
logger := logtesting.TestLogger(t)
ch := make(chan []metrics.StatMessage)

sendErr := errors.New("connection refused")
errorReceived := make(chan struct{})
sink := sendRawFunc(func(msgType int, msg []byte) error {
close(errorReceived)
return sendErr
})

defer close(ch)
go ReportStats(logger, sink, ch, nil)

// Send a stat message
ch <- []metrics.StatMessage{{
Key: types.NamespacedName{Name: "test-revision"},
}}

// Wait for the error to be processed
select {
case <-errorReceived:
// Success - the error path was executed
case <-time.After(2 * time.Second):
t.Fatal("SendRaw was not called within timeout")
}
}

func TestAutoscalerConnectionStatusMonitor(t *testing.T) {
tests := []struct {
name string
statusErr error
}{{
name: "connection established",
statusErr: nil,
}, {
name: "connection not established",
statusErr: errors.New("connection not established"),
}}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := logtesting.TestLogger(t)
ctx, cancel := context.WithCancel(context.Background())

checker := statusCheckerFunc(func() error {
return tt.statusErr
})

// Start the monitor
go AutoscalerConnectionStatusMonitor(ctx, logger, checker, nil)

// Wait for at least one check to complete
time.Sleep(6 * time.Second)

// Cancel context to stop the monitor
cancel()
})
}
}
Loading