Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/cloudwatch-mcp-server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

## [0.0.5] - 2025-10-06

### Added

- Added tool to analyze CloudWatch Metric data

### Changed

- Updated Alarm recommendation tool to support CloudWatch Anomaly Detection Alarms

## [0.0.4] - 2025-07-11

### Changed
Expand Down
1 change: 1 addition & 0 deletions src/cloudwatch-mcp-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Alarm Recommendations - Suggests recommended alarm configurations for CloudWatch
* `get_metric_data` - Retrieves detailed CloudWatch metric data for any CloudWatch metric. Use this for general CloudWatch metrics that aren't specific to Application Signals. Provides ability to query any metric namespace, dimension, and statistic
* `get_metric_metadata` - Retrieves comprehensive metadata about a specific CloudWatch metric
* `get_recommended_metric_alarms` - Gets recommended alarms for a CloudWatch metric
* `analyze_metric` - Analyzes CloudWatch metric data to determine trend, seasonality, and statistical properties

### Tools for CloudWatch Alarms
* `get_active_alarms` - Identifies currently active CloudWatch alarms across the account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CompositeAlarmSummary(BaseModel):


class ActiveAlarmsResponse(BaseModel):
"""Response containing active CloudWatch alarms."""
"""Response containing active CloudWatch Alarms."""

metric_alarms: List[MetricAlarmSummary] = Field(
default_factory=list, description='List of active metric alarms'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ async def get_active_alarms(
Field(description='AWS region to query. Defaults to us-east-1.'),
] = 'us-east-1',
) -> ActiveAlarmsResponse:
"""Gets all CloudWatch alarms currently in ALARM state.
"""Gets all CloudWatch Alarms currently in ALARM state.
This tool retrieves all CloudWatch alarms that are currently in the ALARM state,
This tool retrieves all CloudWatch Alarms that are currently in the ALARM state,
including both metric alarms and composite alarms. Results are optimized for
LLM reasoning with summary-level information.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from awslabs.cloudwatch_mcp_server.cloudwatch_metrics.constants import COMPARISON_OPERATOR_ANOMALY
from awslabs.cloudwatch_mcp_server.cloudwatch_metrics.models import AnomalyDetectionAlarmThreshold
from typing import Any, Dict


logger = logging.getLogger(__name__)


class CloudFormationTemplateGenerator:
"""Generate CloudFormation JSON for CloudWatch Anomaly Detection Alarms."""

def _generate_metric_alarm_template(self, alarm_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate CFN template for a single CloudWatch Alarm."""
if not self._is_anomaly_detection_alarm(alarm_data):
return {}

# Validate required fields
if not alarm_data.get('metricName'):
raise ValueError(
'Metric Name is required to generate CloudFormation templates for Cloudwatch Alarms'
)
if not alarm_data.get('namespace'):
raise ValueError(
'Metric Namespace is required to generate CloudFormation templates for Cloudwatch Alarms'
)

# Process alarm data and add computed fields
formatted_data = self._format_anomaly_detection_alarm_data(alarm_data)

# Build resources dict
anomaly_detector_key = f'{formatted_data["resourceKey"]}AnomalyDetector'
alarm_key = f'{formatted_data["resourceKey"]}Alarm'

resources = {
anomaly_detector_key: {
'Type': 'AWS::CloudWatch::AnomalyDetector',
'Properties': {
'MetricName': formatted_data['metricName'],
'Namespace': formatted_data['namespace'],
'Stat': formatted_data['statistic'],
'Dimensions': formatted_data['dimensions'],
},
},
alarm_key: {
'Type': 'AWS::CloudWatch::Alarm',
'DependsOn': anomaly_detector_key,
'Properties': {
'AlarmDescription': formatted_data['alarmDescription'],
'Metrics': [
{
'Expression': f'ANOMALY_DETECTION_BAND(m1, {formatted_data["sensitivity"]})',
'Id': 'ad1',
},
{
'Id': 'm1',
'MetricStat': {
'Metric': {
'MetricName': formatted_data['metricName'],
'Namespace': formatted_data['namespace'],
'Dimensions': formatted_data['dimensions'],
},
'Stat': formatted_data['statistic'],
'Period': formatted_data['period'],
},
},
],
'EvaluationPeriods': formatted_data['evaluationPeriods'],
'DatapointsToAlarm': formatted_data['datapointsToAlarm'],
'ThresholdMetricId': 'ad1',
'ComparisonOperator': formatted_data['comparisonOperator'],
'TreatMissingData': formatted_data['treatMissingData'],
},
},
}

final_template = {
'AWSTemplateFormatVersion': '2010-09-09',
'Description': 'CloudWatch Alarms and Anomaly Detectors',
'Resources': resources,
}

return final_template

def _is_anomaly_detection_alarm(self, alarm_data: Dict[str, Any]) -> bool:
return alarm_data.get('comparisonOperator') == COMPARISON_OPERATOR_ANOMALY

def _format_anomaly_detection_alarm_data(self, alarm_data: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize alarm data and add computed fields."""
formatted_data = alarm_data.copy()

# Generate resource key from metric name and namespace
formatted_data['resourceKey'] = self._generate_resource_key(
metric_name=alarm_data.get('metricName', ''),
namespace=alarm_data.get('namespace', ''),
dimensions=alarm_data.get('dimensions', []),
)

# Process threshold value
threshold = alarm_data.get('threshold', {})
formatted_data['sensitivity'] = threshold.get(
'sensitivity', AnomalyDetectionAlarmThreshold.DEFAULT_SENSITIVITY
)

# Set defaults
formatted_data.setdefault(
'alarmDescription', 'CloudWatch Alarm generated by CloudWatch MCP server.'
)
formatted_data.setdefault('statistic', 'Average')
formatted_data.setdefault('period', 300)
formatted_data.setdefault('evaluationPeriods', 2)
formatted_data.setdefault('datapointsToAlarm', 2)
formatted_data.setdefault('comparisonOperator', COMPARISON_OPERATOR_ANOMALY)
formatted_data.setdefault('treatMissingData', 'missing')
formatted_data.setdefault('dimensions', [])

return formatted_data

def _generate_resource_key(self, metric_name: str, namespace: str, dimensions: list) -> str:
"""Generate CloudFormation resource key from metric components to act as logical id."""
# Strip AWS/ prefix from namespace (AWS CDK style)
clean_namespace = namespace.replace('AWS/', '')

# Add first dimension key and value for uniqueness if present
dimension_suffix = ''
if dimensions:
first_dim = dimensions[0]
dim_name = first_dim.get('Name', '')
dim_value = first_dim.get('Value', '')
dimension_suffix = f'{dim_name}{dim_value}'

resource_base = f'{clean_namespace}{metric_name}{dimension_suffix}'
return self._sanitize_resource_name(resource_base)

def _sanitize_resource_name(self, name: str) -> str:
"""Sanitize name for CloudFormation resource key."""
# Remove non-alphanumeric characters
sanitized = ''.join(c for c in name if c.isalnum())

# Ensure it starts with letter
if not sanitized or not sanitized[0].isalpha():
sanitized = 'Resource' + sanitized

# Truncate if too long
if len(sanitized) > 255:
sanitized = sanitized[:255]

return sanitized
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# CloudWatch MCP Server Constants

# Time constants
SECONDS_PER_MINUTE = 60
MINUTES_PER_HOUR = 60
HOURS_PER_DAY = 24
DAYS_PER_WEEK = 7

# Analysis constants
DEFAULT_ANALYSIS_PERIOD_MINUTES = 20160 # 2 weeks

# Threshold constants
COMPARISON_OPERATOR_ANOMALY = 'LessThanLowerOrGreaterThanUpperThreshold'

# Numerical stability
NUMERICAL_STABILITY_THRESHOLD = 1e-10
Loading