Skip to content

[Monitor-OpenTelemetry-Exporter] Add 15 Second Warmup for Long-Interval Statsbeat #41229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

- Extend version range for `psutil` to include 7.x
([#40459](https://github.com/Azure/azure-sdk-for-python/pull/40459))
- Add 15 second warmup for long-interval statsbeat metrics
([#41229](https://github.com/Azure/azure-sdk-for-python/pull/41229))

## 1.0.0b36 (2025-04-07)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
_DEFAULT_EU_STATS_CONNECTION_STRING = "InstrumentationKey=7dc56bab-3c0c-4e9f-9ebb-d1acadee8d0f;IngestionEndpoint=https://westeurope-5.in.applicationinsights.azure.com/"
_DEFAULT_STATS_SHORT_EXPORT_INTERVAL = 900 # 15 minutes
_DEFAULT_STATS_LONG_EXPORT_INTERVAL = 86400 # 24 hours
_INITIAL_DELAY_SECONDS = 60.0 # 15 second delay for the first long-interval export
_EU_ENDPOINTS = [
"westeurope",
"northeurope",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
"""
Internal module for statsbeat functionality in Azure Monitor OpenTelemetry exporter.
This module handles collection and exporting of statsbeat metrics.
"""
import threading
from threading import Timer

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource

from azure.monitor.opentelemetry.exporter._constants import _INITIAL_DELAY_SECONDS
from azure.monitor.opentelemetry.exporter.statsbeat._exporter import _StatsBeatExporter
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat_metrics import _StatsbeatMetrics
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
Expand All @@ -18,14 +24,42 @@
_get_stats_short_export_interval,
)


_STATSBEAT_METRICS = None
_STATSBEAT_LOCK = threading.Lock()

def _delayed_export_statsbeat():
"""
Function to perform a delayed export of statsbeat metrics
after the initial delay period has passed.
"""
# Check if we're in a shutdown state
with _STATSBEAT_STATE_LOCK:
if _STATSBEAT_STATE["SHUTDOWN"]:
return

with _STATSBEAT_LOCK:
if _STATSBEAT_METRICS is not None and _STATSBEAT_METRICS._meter_provider is not None: # pylint: disable=protected-access
try:
# Trigger a forced export of the metrics after the delay
_STATSBEAT_METRICS._meter_provider.force_flush() # pylint: disable=protected-access
except: # pylint: disable=bare-except
pass

# pylint: disable=global-statement
# pylint: disable=protected-access
def collect_statsbeat_metrics(exporter) -> None:
"""
Initialize and collect statsbeat metrics from the exporter.

Sets up a periodic metric reader to export metrics and initializes required
metrics for collecting statistics about the exporter's behavior.

:param exporter: The exporter instance to collect metrics from. Contains information
about instrumentation key, endpoint, and other configuration details.
:type exporter: ~azure.monitor.opentelemetry.exporter.AzureMonitorLogExporter or
~azure.monitor.opentelemetry.exporter.AzureMonitorTraceExporter or
~azure.monitor.opentelemetry.exporter.AzureMonitorMetricExporter
"""
global _STATSBEAT_METRICS
# Only start statsbeat if did not exist before
if _STATSBEAT_METRICS is None:
Expand All @@ -36,15 +70,17 @@ def collect_statsbeat_metrics(exporter) -> None:
)
reader = PeriodicExportingMetricReader(
statsbeat_exporter,
export_interval_millis=_get_stats_short_export_interval() * 1000, # 15m by default
export_interval_millis=_get_stats_short_export_interval() * 1000,
)
mp = MeterProvider(
metric_readers=[reader],
resource=Resource.get_empty(),
)
# long_interval_threshold represents how many collects for short interval
# should have passed before a long interval collect
long_interval_threshold = _get_stats_long_export_interval() // _get_stats_short_export_interval()
long_export = _get_stats_long_export_interval()
short_export = _get_stats_short_export_interval()
long_interval_threshold = long_export // short_export
_STATSBEAT_METRICS = _StatsbeatMetrics(
mp,
exporter._instrumentation_key,
Expand All @@ -54,13 +90,22 @@ def collect_statsbeat_metrics(exporter) -> None:
exporter._credential is not None,
exporter._distro_version,
)
# Export some initial stats on program start
mp.force_flush()
# initialize non-initial stats
_STATSBEAT_METRICS.init_non_initial_metrics()

# Schedule a second export after the initial delay to send feature, instrumentation,
# and attach statsbeat metrics (which have a 15-second delay)
timer = Timer(_INITIAL_DELAY_SECONDS, _delayed_export_statsbeat)
timer.daemon = True # Set as daemon so it doesn't block program exit
timer.start()

def shutdown_statsbeat_metrics() -> None:
"""
Shut down the statsbeat metrics collection system.

This ensures proper cleanup of resources and marks the system as shut down
to prevent further metric collection.
"""
global _STATSBEAT_METRICS
shutdown_success = False
if _STATSBEAT_METRICS is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import re
import sys
import threading
import time
from typing import Any, Dict, Iterable, List

import requests # pylint: disable=networking-import-outside-azure-core-transport
Expand All @@ -18,6 +19,7 @@
from azure.monitor.opentelemetry.exporter._constants import (
_ATTACH_METRIC_NAME,
_FEATURE_METRIC_NAME,
_INITIAL_DELAY_SECONDS,
_KUBERNETES_SERVICE_HOST,
_REQ_DURATION_NAME,
_REQ_EXCEPTION_NAME,
Expand Down Expand Up @@ -138,6 +140,9 @@ def __init__(
_FEATURE_METRIC_NAME[0]: sys.maxsize,
}
self._long_interval_lock = threading.Lock()
# Add startup timestamp and delay for initial statsbeat metrics
self._startup_time = time.time()
self._initial_delay_seconds = _INITIAL_DELAY_SECONDS # 15 second delay for initial metrics
_StatsbeatMetrics._COMMON_ATTRIBUTES["cikey"] = instrumentation_key
if _utils._is_attach_enabled():
_StatsbeatMetrics._COMMON_ATTRIBUTES["attach"] = _AttachTypes.INTEGRATED
Expand Down Expand Up @@ -266,6 +271,7 @@ def _get_feature_metric(self, options: CallbackOptions) -> Iterable[Observation]
return observations

def _meets_long_interval_threshold(self, name) -> bool:
# For feature and attach metrics, check if the initial delay has passed
with self._long_interval_lock:
# if long interval theshold not met, it is not time to export
# statsbeat metrics that are long intervals
Expand Down
Loading