Comprehensive monitoring and observability are critical for the LLM Guardian Cluster's success. This system implements multi-layered monitoring that tracks everything from individual specialist performance to system-wide health metrics, enabling proactive issue detection and continuous optimization.
graph TB
subgraph "Data Collection Layer"
AGENTS[Monitoring Agents]
METRICS[Metrics Collectors]
LOGS[Log Aggregators]
TRACES[Distributed Tracing]
end
subgraph "LLM Guardian Cluster"
SPECIALISTS[Specialists]
GUARDIANS[Guardians]
ORCHESTRATOR[Orchestrator]
INFRASTRUCTURE[Infrastructure]
end
subgraph "Storage Layer"
PROMETHEUS[(Prometheus)]
ELASTICSEARCH[(Elasticsearch)]
JAEGER[(Jaeger)]
INFLUXDB[(InfluxDB)]
end
subgraph "Analysis Layer"
GRAFANA[Grafana Dashboards]
KIBANA[Kibana Logs]
ALERTMANAGER[Alert Manager]
ML_ANALYTICS[ML Analytics]
end
subgraph "Response Layer"
NOTIFICATIONS[Notifications]
AUTOMATION[Auto-remediation]
ESCALATION[Escalation]
end
SPECIALISTS --> AGENTS
GUARDIANS --> METRICS
ORCHESTRATOR --> LOGS
INFRASTRUCTURE --> TRACES
AGENTS --> PROMETHEUS
METRICS --> INFLUXDB
LOGS --> ELASTICSEARCH
TRACES --> JAEGER
PROMETHEUS --> GRAFANA
ELASTICSEARCH --> KIBANA
PROMETHEUS --> ALERTMANAGER
INFLUXDB --> ML_ANALYTICS
GRAFANA --> NOTIFICATIONS
ALERTMANAGER --> AUTOMATION
ML_ANALYTICS --> ESCALATION
Specialist Performance Metrics:
# llm_guardian_cluster/monitoring/specialist_metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary
import time
from contextlib import contextmanager
class SpecialistMetrics:
def __init__(self, specialist_type: str):
self.specialist_type = specialist_type
# Request metrics
self.requests_total = Counter(
'specialist_requests_total',
'Total requests processed by specialist',
['specialist_type', 'status', 'quality_tier']
)
self.request_duration = Histogram(
'specialist_request_duration_seconds',
'Time spent processing requests',
['specialist_type', 'complexity'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
self.token_processing_rate = Gauge(
'specialist_tokens_per_second',
'Token processing rate',
['specialist_type']
)
# Quality metrics
self.quality_score = Histogram(
'specialist_quality_score',
'Response quality scores',
['specialist_type'],
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)
self.confidence_score = Histogram(
'specialist_confidence_score',
'Response confidence scores',
['specialist_type'],
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)
# Resource metrics
self.gpu_utilization = Gauge(
'specialist_gpu_utilization_percent',
'GPU utilization percentage',
['specialist_type', 'gpu_id']
)
self.memory_usage = Gauge(
'specialist_memory_usage_bytes',
'Memory usage in bytes',
['specialist_type', 'memory_type']
)
# Error metrics
self.errors_total = Counter(
'specialist_errors_total',
'Total errors encountered',
['specialist_type', 'error_type', 'severity']
)
self.timeout_count = Counter(
'specialist_timeouts_total',
'Total request timeouts',
['specialist_type']
)
@contextmanager
def request_timer(self, complexity: str = "medium"):
"""Context manager for timing requests"""
start_time = time.time()
try:
yield
status = "success"
except Exception as e:
status = "error"
self.errors_total.labels(
specialist_type=self.specialist_type,
error_type=type(e).__name__,
severity="high"
).inc()
raise
finally:
duration = time.time() - start_time
self.request_duration.labels(
specialist_type=self.specialist_type,
complexity=complexity
).observe(duration)
self.requests_total.labels(
specialist_type=self.specialist_type,
status=status,
quality_tier="unknown" # Will be updated later
).inc()
def record_quality_metrics(self, quality_score: float, confidence: float, quality_tier: str):
"""Record quality and confidence metrics"""
self.quality_score.labels(specialist_type=self.specialist_type).observe(quality_score)
self.confidence_score.labels(specialist_type=self.specialist_type).observe(confidence)
# Update request counter with quality tier
self.requests_total.labels(
specialist_type=self.specialist_type,
status="success",
quality_tier=quality_tier
).inc()
def update_resource_usage(self, gpu_utilization: dict, memory_usage: dict):
"""Update resource utilization metrics"""
for gpu_id, utilization in gpu_utilization.items():
self.gpu_utilization.labels(
specialist_type=self.specialist_type,
gpu_id=gpu_id
).set(utilization)
for memory_type, usage in memory_usage.items():
self.memory_usage.labels(
specialist_type=self.specialist_type,
memory_type=memory_type
).set(usage)Guardian Performance Metrics:
# llm_guardian_cluster/monitoring/guardian_metrics.py
class GuardianMetrics:
def __init__(self, guardian_type: str, specialist_id: str):
self.guardian_type = guardian_type
self.specialist_id = specialist_id
# Evaluation metrics
self.evaluations_total = Counter(
'guardian_evaluations_total',
'Total evaluations performed',
['guardian_type', 'specialist_id', 'evaluation_type']
)
self.evaluation_duration = Histogram(
'guardian_evaluation_duration_seconds',
'Time spent on evaluations',
['guardian_type', 'evaluation_type'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0]
)
# Issue detection metrics
self.issues_detected = Counter(
'guardian_issues_detected_total',
'Issues detected by guardian',
['guardian_type', 'specialist_id', 'issue_type', 'severity']
)
self.false_positive_rate = Gauge(
'guardian_false_positive_rate',
'False positive rate for issue detection',
['guardian_type', 'issue_type']
)
# Improvement metrics
self.improvements_suggested = Counter(
'guardian_improvements_suggested_total',
'Improvements suggested by guardian',
['guardian_type', 'improvement_type', 'priority']
)
self.improvements_implemented = Counter(
'guardian_improvements_implemented_total',
'Improvements successfully implemented',
['guardian_type', 'improvement_type']
)
# Accuracy metrics
self.prediction_accuracy = Gauge(
'guardian_prediction_accuracy',
'Accuracy of guardian predictions',
['guardian_type', 'prediction_type']
)
def record_evaluation(self, evaluation_type: str, duration: float, issues_found: list):
"""Record evaluation metrics"""
self.evaluations_total.labels(
guardian_type=self.guardian_type,
specialist_id=self.specialist_id,
evaluation_type=evaluation_type
).inc()
self.evaluation_duration.labels(
guardian_type=self.guardian_type,
evaluation_type=evaluation_type
).observe(duration)
# Record detected issues
for issue in issues_found:
self.issues_detected.labels(
guardian_type=self.guardian_type,
specialist_id=self.specialist_id,
issue_type=issue.type,
severity=issue.severity
).inc()System Health Metrics:
# monitoring/prometheus/rules/infrastructure.yml
groups:
- name: infrastructure
rules:
# Node health
- record: node:cpu_utilization:rate5m
expr: 100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)
- record: node:memory_utilization:ratio
expr: 1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)
- record: node:disk_utilization:ratio
expr: 1 - (node_filesystem_avail_bytes / node_filesystem_size_bytes)
# GPU metrics
- record: gpu:utilization:avg5m
expr: avg_over_time(nvidia_smi_utilization_gpu_ratio[5m])
- record: gpu:memory_utilization:ratio
expr: nvidia_smi_memory_used_bytes / nvidia_smi_memory_total_bytes
# Network metrics
- record: network:throughput:rate5m
expr: rate(node_network_transmit_bytes_total[5m]) + rate(node_network_receive_bytes_total[5m])
# Database metrics
- record: postgresql:connections:utilization
expr: pg_stat_database_numbackends / pg_settings_max_connections
- record: postgresql:query_duration:p95
expr: histogram_quantile(0.95, pg_stat_statements_mean_time_bucket)# llm_guardian_cluster/monitoring/logging.py
import structlog
import json
from datetime import datetime
from typing import Any, Dict
class StructuredLogger:
def __init__(self, component: str, specialist_id: str = None):
self.component = component
self.specialist_id = specialist_id
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
self._add_context,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
self.logger = structlog.get_logger()
def _add_context(self, _, __, event_dict):
"""Add common context to all log entries"""
event_dict.update({
"component": self.component,
"specialist_id": self.specialist_id,
"cluster_id": "llm-guardian-cluster",
"version": "1.2.3"
})
return event_dict
def log_request_start(self, request_id: str, query: str, context: Dict[str, Any]):
"""Log request processing start"""
self.logger.info(
"Request processing started",
request_id=request_id,
query_length=len(query),
context_size=len(json.dumps(context)),
event_type="request_start"
)
def log_request_complete(self, request_id: str, duration: float,
quality_score: float, success: bool):
"""Log request processing completion"""
self.logger.info(
"Request processing completed",
request_id=request_id,
duration=duration,
quality_score=quality_score,
success=success,
event_type="request_complete"
)
def log_guardian_evaluation(self, guardian_type: str, evaluation_result: Dict[str, Any]):
"""Log guardian evaluation"""
self.logger.info(
"Guardian evaluation completed",
guardian_type=guardian_type,
evaluation_score=evaluation_result.get("score"),
issues_detected=len(evaluation_result.get("issues", [])),
recommendations=len(evaluation_result.get("recommendations", [])),
event_type="guardian_evaluation"
)
def log_error(self, error: Exception, context: Dict[str, Any]):
"""Log error with context"""
self.logger.error(
"Error occurred",
error_type=type(error).__name__,
error_message=str(error),
context=context,
event_type="error"
)
def log_performance_alert(self, metric: str, value: float, threshold: float):
"""Log performance alerts"""
self.logger.warning(
"Performance threshold exceeded",
metric=metric,
value=value,
threshold=threshold,
event_type="performance_alert"
)# logging/fluentd/fluentd.conf
<source>
@type tail
path /var/log/containers/llm-guardian-*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
format json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</source>
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
kubernetes_url "#{ENV['KUBERNETES_SERVICE_HOST']}:#{ENV['KUBERNETES_SERVICE_PORT_HTTPS']}"
verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
ca_file "#{ENV['KUBERNETES_CA_FILE']}"
skip_labels false
skip_container_metadata false
skip_namespace_metadata false
skip_master_url false
</filter>
<filter kubernetes.**>
@type parser
key_name log
reserve_data true
<parse>
@type json
</parse>
</filter>
<match kubernetes.**>
@type elasticsearch
host elasticsearch.monitoring.svc.cluster.local
port 9200
index_name llm-guardian-logs
type_name _doc
include_tag_key true
tag_key @log_name
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
total_limit_size 500M
overflow_action block
</buffer>
</match># llm_guardian_cluster/monitoring/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
import functools
class TracingManager:
def __init__(self, service_name: str, jaeger_endpoint: str):
self.service_name = service_name
# Configure tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger-agent",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Auto-instrument common libraries
RequestsInstrumentor().instrument()
Psycopg2Instrumentor().instrument()
self.tracer = tracer
def trace_request(self, operation_name: str):
"""Decorator for tracing request processing"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
with self.tracer.start_as_current_span(operation_name) as span:
# Add common attributes
span.set_attribute("service.name", self.service_name)
span.set_attribute("operation.name", operation_name)
try:
result = await func(*args, **kwargs)
span.set_attribute("operation.status", "success")
return result
except Exception as e:
span.set_attribute("operation.status", "error")
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
raise
return wrapper
return decorator
def trace_guardian_evaluation(self, guardian_type: str, specialist_id: str):
"""Decorator for tracing guardian evaluations"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
with self.tracer.start_as_current_span("guardian_evaluation") as span:
span.set_attribute("guardian.type", guardian_type)
span.set_attribute("specialist.id", specialist_id)
result = await func(*args, **kwargs)
span.set_attribute("evaluation.score", result.get("score", 0))
span.set_attribute("issues.count", len(result.get("issues", [])))
return result
return wrapper
return decoratorSystem Overview Dashboard:
{
"dashboard": {
"id": null,
"title": "LLM Guardian Cluster - System Overview",
"tags": ["llm-guardian", "overview"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "Request Rate",
"type": "stat",
"targets": [
{
"expr": "sum(rate(specialist_requests_total[5m]))",
"legendFormat": "Requests/sec"
}
],
"fieldConfig": {
"defaults": {
"unit": "reqps",
"min": 0
}
},
"gridPos": { "h": 8, "w": 6, "x": 0, "y": 0 }
},
{
"id": 2,
"title": "Average Quality Score",
"type": "stat",
"targets": [
{
"expr": "avg(specialist_quality_score)",
"legendFormat": "Quality Score"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"min": 0,
"max": 1,
"thresholds": {
"steps": [
{ "color": "red", "value": 0 },
{ "color": "yellow", "value": 0.7 },
{ "color": "green", "value": 0.8 }
]
}
}
},
"gridPos": { "h": 8, "w": 6, "x": 6, "y": 0 }
},
{
"id": 3,
"title": "Response Time",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(specialist_request_duration_seconds_bucket[5m]))",
"legendFormat": "50th percentile"
},
{
"expr": "histogram_quantile(0.95, rate(specialist_request_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
},
{
"expr": "histogram_quantile(0.99, rate(specialist_request_duration_seconds_bucket[5m]))",
"legendFormat": "99th percentile"
}
],
"yAxes": [
{
"unit": "s",
"min": 0
}
],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }
},
{
"id": 4,
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(specialist_errors_total[5m])",
"legendFormat": "{{specialist_type}} - {{error_type}}"
}
],
"yAxes": [
{
"unit": "ops",
"min": 0
}
],
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }
},
{
"id": 5,
"title": "Resource Utilization",
"type": "graph",
"targets": [
{
"expr": "avg(specialist_gpu_utilization_percent)",
"legendFormat": "GPU Utilization"
},
{
"expr": "avg(node:cpu_utilization:rate5m)",
"legendFormat": "CPU Utilization"
},
{
"expr": "avg(node:memory_utilization:ratio) * 100",
"legendFormat": "Memory Utilization"
}
],
"yAxes": [
{
"unit": "percent",
"min": 0,
"max": 100
}
],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }
}
]
}
}Specialist Performance Dashboard:
{
"dashboard": {
"title": "Specialist Performance Analysis",
"panels": [
{
"title": "Quality Score Distribution",
"type": "heatmap",
"targets": [
{
"expr": "increase(specialist_quality_score_bucket[5m])",
"format": "heatmap",
"legendFormat": "{{le}}"
}
]
},
{
"title": "Token Processing Rate",
"type": "graph",
"targets": [
{
"expr": "specialist_tokens_per_second",
"legendFormat": "{{specialist_type}}"
}
]
},
{
"title": "Guardian Issue Detection",
"type": "table",
"targets": [
{
"expr": "increase(guardian_issues_detected_total[1h])",
"format": "table",
"instant": true
}
]
}
]
}
}# monitoring/prometheus/rules/alerts.yml
groups:
- name: llm-guardian-alerts
rules:
# High error rate
- alert: HighErrorRate
expr: rate(specialist_errors_total[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "High error rate detected for {{ $labels.specialist_type }}"
description: "Error rate is {{ $value | humanize }} errors/sec for specialist {{ $labels.specialist_type }}"
runbook_url: "https://docs.llm-guardian-cluster.com/runbooks/high-error-rate"
# Low quality scores
- alert: LowQualityScore
expr: avg_over_time(specialist_quality_score[10m]) < 0.7
for: 5m
labels:
severity: critical
annotations:
summary: "Quality score below threshold for {{ $labels.specialist_type }}"
description: "Average quality score is {{ $value | humanizePercentage }} for specialist {{ $labels.specialist_type }}"
# High response time
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(specialist_request_duration_seconds_bucket[5m])) > 10
for: 3m
labels:
severity: warning
annotations:
summary: "High response time for {{ $labels.specialist_type }}"
description: "95th percentile response time is {{ $value | humanizeDuration }} for specialist {{ $labels.specialist_type }}"
# Resource utilization
- alert: HighGPUUtilization
expr: avg(specialist_gpu_utilization_percent) > 90
for: 10m
labels:
severity: warning
annotations:
summary: "High GPU utilization"
description: "GPU utilization is {{ $value | humanizePercentage }} across the cluster"
- alert: HighMemoryUsage
expr: avg(node:memory_utilization:ratio) > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "High memory usage"
description: "Memory utilization is {{ $value | humanizePercentage }} across nodes"
# Guardian-specific alerts
- alert: GuardianEvaluationFailure
expr: increase(guardian_errors_total[5m]) > 5
for: 1m
labels:
severity: critical
annotations:
summary: "Guardian evaluation failures detected"
description: "Guardian {{ $labels.guardian_type }} has {{ $value }} failures in the last 5 minutes"
- alert: ImprovementBacklog
expr: count(improvement_suggestions{status="pending"}) > 50
for: 30m
labels:
severity: warning
annotations:
summary: "Large improvement suggestion backlog"
description: "There are {{ $value }} pending improvement suggestions"
# Database alerts
- alert: DatabaseConnectionLimit
expr: postgresql:connections:utilization > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "Database connection limit approaching"
description: "Database connection utilization is {{ $value | humanizePercentage }}"
- alert: SlowQueries
expr: postgresql:query_duration:p95 > 5
for: 3m
labels:
severity: warning
annotations:
summary: "Slow database queries detected"
description: "95th percentile query duration is {{ $value }}s"# monitoring/alertmanager/alertmanager.yml
global:
smtp_smarthost: "smtp.gmail.com:587"
smtp_from: "alerts@llm-guardian-cluster.com"
route:
group_by: ["alertname", "cluster", "service"]
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: "default"
routes:
- match:
severity: critical
receiver: "critical-alerts"
continue: true
- match:
severity: warning
receiver: "warning-alerts"
receivers:
- name: "default"
slack_configs:
- api_url: "YOUR_SLACK_WEBHOOK_URL"
channel: "#llm-guardian-alerts"
title: "LLM Guardian Cluster Alert"
text: |
{{ range .Alerts }}
*Alert:* {{ .Annotations.summary }}
*Description:* {{ .Annotations.description }}
*Severity:* {{ .Labels.severity }}
{{ end }}
- name: "critical-alerts"
email_configs:
- to: "oncall@company.com"
subject: "CRITICAL: LLM Guardian Cluster Alert"
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
Runbook: {{ .Annotations.runbook_url }}
{{ end }}
pagerduty_configs:
- service_key: "YOUR_PAGERDUTY_SERVICE_KEY"
description: "Critical LLM Guardian Cluster Alert"
- name: "warning-alerts"
slack_configs:
- api_url: "YOUR_SLACK_WEBHOOK_URL"
channel: "#llm-guardian-warnings"
title: "LLM Guardian Warning"# llm_guardian_cluster/monitoring/ai_metrics.py
class AIWorkloadMetrics:
def __init__(self):
self.model_inference_time = Histogram(
'model_inference_duration_seconds',
'Time spent on model inference',
['model_name', 'batch_size', 'sequence_length'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
self.context_length_distribution = Histogram(
'context_length_tokens',
'Distribution of context lengths',
['specialist_type'],
buckets=[100, 500, 1000, 2000, 4000, 8000, 16000, 32000]
)
self.memory_efficiency = Gauge(
'memory_efficiency_ratio',
'Ratio of useful vs total memory usage',
['component', 'memory_type']
)
self.cache_hit_rate = Gauge(
'cache_hit_rate',
'Cache hit rate for various components',
['cache_type', 'specialist_type']
)
self.batch_efficiency = Gauge(
'batch_processing_efficiency',
'Efficiency of batch processing',
['specialist_type']
)
def record_inference_metrics(self, model_name: str, batch_size: int,
sequence_length: int, duration: float):
"""Record model inference metrics"""
self.model_inference_time.labels(
model_name=model_name,
batch_size=str(batch_size),
sequence_length=str(sequence_length)
).observe(duration)
def record_context_metrics(self, specialist_type: str, context_length: int):
"""Record context usage metrics"""
self.context_length_distribution.labels(
specialist_type=specialist_type
).observe(context_length)# llm_guardian_cluster/monitoring/anomaly_detection.py
from sklearn.ensemble import IsolationForest
import numpy as np
from typing import List, Dict, Any
import pandas as pd
class AnomalyDetector:
def __init__(self, contamination: float = 0.1):
self.models = {}
self.contamination = contamination
self.training_data = {}
def train_model(self, metric_name: str, training_data: List[float]):
"""Train anomaly detection model for a specific metric"""
data = np.array(training_data).reshape(-1, 1)
model = IsolationForest(contamination=self.contamination, random_state=42)
model.fit(data)
self.models[metric_name] = model
self.training_data[metric_name] = data
def detect_anomalies(self, metric_name: str, current_values: List[float]) -> List[bool]:
"""Detect anomalies in current metric values"""
if metric_name not in self.models:
return [False] * len(current_values)
model = self.models[metric_name]
data = np.array(current_values).reshape(-1, 1)
predictions = model.predict(data)
# -1 indicates anomaly, 1 indicates normal
return [pred == -1 for pred in predictions]
def get_anomaly_score(self, metric_name: str, value: float) -> float:
"""Get anomaly score for a specific value"""
if metric_name not in self.models:
return 0.0
model = self.models[metric_name]
score = model.decision_function([[value]])[0]
# Convert to 0-1 scale where 1 is most anomalous
return max(0, -score)Next: Security & Compliance