Skip to content

Commit

Permalink
Serverless (#16)
Browse files Browse the repository at this point in the history
* feat - support-serverless


---------

Co-authored-by: Prashant Agrawal <[email protected]>
  • Loading branch information
prashanttct07 and Prashant-AtAWS authored Feb 13, 2023
1 parent 0ec91c4 commit e728948
Show file tree
Hide file tree
Showing 4 changed files with 413 additions and 177 deletions.
254 changes: 222 additions & 32 deletions CWMetricsToOpenSearch/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,30 @@
import es_sink.flushing_buffer

# Lambda Interval Settings (seconds)
LAMBDA_INTERVAL=300
LAMBDA_INTERVAL=60

# This structure details the metrics available per domain. Domain names are unique
# by region, but not globally, so the identifier includes the domain name and region
# pair. The metric_descriptions are a collection of SingleMetricDescriptions, which
# provide the dimensions to pass to CloudWatch to retrieve the values for the
# provide the dimensions to pass to CloudWatch to retrieve the values for the
# metric.
DomainMetricsAvailable = namedtuple('DomainMetricsAvailable',
DomainMetricsAvailable = namedtuple('DomainMetricsAvailable',
('region', 'domain_name', 'metric_descriptions'))
SingleMetricDescription = namedtuple('SingleMetricDescription',
SingleMetricDescription = namedtuple('SingleMetricDescription',
('metric_name', 'dims'))


SingleMetricValue = namedtuple('SingleMetricValue',
('region', 'domain_name', 'metric_name', 'stat',
('region', 'domain_name', 'metric_name', 'stat',
'value', 'timestamp'))

CollectionMetricsAvailable = namedtuple('CollectionMetricsAvailable',
('region', 'collection_id', 'metric_descriptions'))

SingleMetricValueCollection = namedtuple('SingleMetricValue',
('region', 'collection_name', 'collection_id', 'index_name', 'metric_name', 'stat',
'value', 'timestamp'))


################################################################################
# Environment
Expand All @@ -46,16 +53,16 @@
DOMAIN_ADMIN_UNAME = os.environ['DOMAIN_ADMIN_UNAME']
DOMAIN_ADMIN_PW = os.environ['DOMAIN_ADMIN_PW']
REGIONS = json.loads(os.environ['REGIONS'])

SERVERLESS_REGIONS = json.loads(os.environ['SERVERLESS_REGIONS'])
################################################################################
# Timestamp tracking

def get_last_timestamp_ddb(domain_name, region):
ddb = boto3.client('dynamodb')
try:
ret = ddb.get_item(TableName=DDB_TABLE,
Key={'domain': {'S': domain_name},
'region': {'S': region}})
Key={'domain': {'S': domain_name},
'region': {'S': region}})
if not ret or not ret.get('Item', None):
return None
iso_ts = ret['Item'].get('Timestamp', None)
Expand Down Expand Up @@ -102,12 +109,13 @@ def get_last_timestamp(domain_name, region):


################################################################################
# Domain tracking;
# Domain tracking;

def list_all_domains():
''' Loops through the list of REGIONS, listing out all domains for this
account in that region. Returns a list of domain names.
account in that region. Returns a list of domain names.
'''
print("Started processing for list_all_domains")
doms = {}
for region in REGIONS:
es = boto3.client('es', region)
Expand All @@ -120,10 +128,28 @@ def list_all_domains():
print(e)
return doms

# Domain tracking;

def list_all_collections():
''' Loops through the list of REGIONS, listing out all domains for this
account in that region. Returns a list of domain names.
'''
print("Started processing for list_all_collections")
cols = {}
for region in SERVERLESS_REGIONS:
aoss = boto3.client('opensearchserverless', region)
try:
resp = aoss.list_collections()
resp = resp['collectionSummaries']
cols[region] = [val['id'] for val in resp]
except Exception as e:
print('Failed to get domain names in region: {}'.format(region))
print(e)
return cols

################################################################################
# CloudWatch interface
#
#
def list_domain_cloudwatch_metrics(domain_name=None, region=None):
''' For a particular domain/region, list all available metrics. Different
ES versions have different metrics for them. This ensures retrieving
Expand All @@ -144,7 +170,31 @@ def list_domain_cloudwatch_metrics(domain_name=None, region=None):
for page in iter:
metrics = page['Metrics']
for metric in metrics:
resp.append(SingleMetricDescription(metric_name=metric['MetricName'],
resp.append(SingleMetricDescription(metric_name=metric['MetricName'],
dims=metric['Dimensions']))
return resp

def list_domain_cloudwatch_metrics_collections(collection_id=None, region=None):
''' For a particular domain/region, list all available metrics. Different
ES versions have different metrics for them. This ensures retrieving
all metrics.
Returns a list of SingleMetricDescriptions
'''
cw = boto3.client('cloudwatch', region)
paginator = cw.get_paginator('list_metrics')
iter = paginator.paginate(
Dimensions=[
{
'Name': 'CollectionId',
'Value': collection_id
}
]
)
resp = []
for page in iter:
metrics = page['Metrics']
for metric in metrics:
resp.append(SingleMetricDescription(metric_name=metric['MetricName'],
dims=metric['Dimensions']))
return resp

Expand All @@ -161,6 +211,18 @@ def get_all_domain_metric_descriptions(doms):
resp.append(DomainMetricsAvailable(region, domain, dmets))
return resp

def get_all_domain_metric_descriptions_collections(colls):
''' Takes a list of dicts - region: list of collections and retrieves the available
metrics for each of the collections.
'''
resp = []
for region, collections in colls.items():
for collection in collections:
dmets = list_domain_cloudwatch_metrics_collections(collection_id=collection,
region=region)
resp.append(CollectionMetricsAvailable(region, collection, dmets))
return resp


def build_metric_data_queries(domain_name, region, metric_descriptions):
ret = []
Expand All @@ -170,30 +232,62 @@ def build_metric_data_queries(domain_name, region, metric_descriptions):
label = '{} {} {} {}'.format(domain_name, region, metric_name, stat)
_id = 'a' + str(uuid.uuid1()).lower().replace('-', '_')
ret.append(
{
'Id': _id,
'Label': label,
'MetricStat': {
'Metric': {
'Namespace': 'AWS/ES',
'MetricName': metric_name,
'Dimensions': md.dims
},
'Period': LAMBDA_INTERVAL, # ? any need to do more granular than 1 minute?
'Stat': stat,
}
{
'Id': _id,
'Label': label,
'MetricStat': {
'Metric': {
'Namespace': 'AWS/ES',
'MetricName': metric_name,
'Dimensions': md.dims
},
'Period': LAMBDA_INTERVAL, # ? any need to do more granular than 1 minute?
'Stat': stat,
}
}
)
return ret

def build_metric_data_queries_collections(collection_id, region, metric_descriptions):
ret = []
for md in metric_descriptions:
metric_name = md.metric_name
collection_name = "N/A"
index_name = "N/A"
for dimensions in md.dims:
if dimensions['Name'] == "CollectionName":
collection_name = dimensions['Value']
elif dimensions['Name'] == "IndexName":
index_name = dimensions['Value']

for stat in ['Minimum', 'Maximum', 'Average']: # What flexibility does this need?
label = '{} {} {} {} {} {}'.format(collection_id, collection_name, index_name, region, metric_name, stat)
_id = 'a' + str(uuid.uuid1()).lower().replace('-', '_')
ret.append(
{
'Id': _id,
'Label': label,
'MetricStat': {
'Metric': {
'Namespace': 'AWS/AOSS',
'MetricName': metric_name,
'Dimensions': md.dims
},
'Period': LAMBDA_INTERVAL, # ? any need to do more granular than 1 minute?
'Stat': stat,
}
}
)
return ret


def grouper(iterable, n):
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, n))
if not chunk:
return
yield chunk
chunk = list(itertools.islice(it, n))
if not chunk:
return
yield chunk


def get_single_domain_metric_values(domain_name, region, metric_descriptions):
Expand Down Expand Up @@ -237,6 +331,48 @@ def get_single_domain_metric_values(domain_name, region, metric_descriptions):
print()
return ret

def get_single_domain_metric_values_collections(collection_id, region, metric_descriptions):
# TODO: Make this multi-domain?
ret = list()
cw = boto3.client('cloudwatch', region)
queries = build_metric_data_queries_collections(collection_id, region, metric_descriptions)
# The CW query runs from now to the last time this retrieved data. It could miss
# data points on edge case boundaries.
time_now = datetime.utcfromtimestamp(time.time())
last_timestamp = get_last_timestamp_ddb(collection_id, region)
if not last_timestamp:
last_timestamp = time_now - timedelta(minutes=15)

for group in grouper(queries, 100):
try:
paginator = cw.get_paginator('get_metric_data')
iter = paginator.paginate(MetricDataQueries=group,
StartTime=last_timestamp,
EndTime=time_now)
for page in iter:
for result in page['MetricDataResults']:
# TODO: Error handling
(collection_id, collection_name, index_name, region, metric_name, stat) = result['Label'].split(' ')
for val in zip(result['Timestamps'], result['Values']):
ts = val[0].replace(microsecond=0, tzinfo=tz.tzutc())
ret.append(SingleMetricValueCollection(
collection_id=collection_id,
collection_name=collection_name,
index_name=index_name,
region=region,
metric_name=metric_name,
stat=stat,
timestamp=ts.isoformat(),
value=val[1]
))
update_metric_timestamp_ddb(collection_id, region, time_now)
except Exception as e:
# Handle me better
print('Exception', collection_id, region)
print(e)
print()
return ret


def get_all_domain_metric_values(domains):
''' Domains is a list of DomainMetricDescriptions - tuples with domain_name,
Expand All @@ -252,11 +388,25 @@ def get_all_domain_metric_values(domains):
return res


def get_all_domain_metric_values_collections(collections):
''' Collections is a list of CollectionMetricDescriptions - tuples with collection_id,
region, and a list of SingleMetricDescriptions.
Returns a list of SingleMetricValuesCollections.
'''
# TODO: Send a single request rather than 1 per domain/region dimension
res = list()
for collection in collections:
collection_id = collection.collection_id
region = collection.region
res.extend(get_single_domain_metric_values_collections(collection_id, region, collection.metric_descriptions))
return res


################################################################################
# Amazon OpenSearch interface
ES_AUTH = es_sink.es_auth.ESHttpAuth(DOMAIN_ADMIN_UNAME, DOMAIN_ADMIN_PW)

INDEX_DESCRIPTOR = IndexDescriptor(es_index='domains', es_v7=True, timestamped=True)
ES_AUTH = es_sink.es_auth.ESHttpAuth(DOMAIN_ADMIN_UNAME, DOMAIN_ADMIN_PW)
ES_DESCRIPTOR = ESDescriptor(
endpoint=DOMAIN_ENDPOINT,
index_descriptor=INDEX_DESCRIPTOR,
Expand All @@ -265,6 +415,14 @@ def get_all_domain_metric_values(domains):
ES_BUFFER = es_sink.flushing_buffer.flushing_buffer_factory(ES_DESCRIPTOR,
flush_trigger=1000)

INDEX_DESCRIPTOR_COLLECTIONS = IndexDescriptor(es_index='collections', es_v7=True, timestamped=True)
ES_DESCRIPTOR_COLLECTIONS = ESDescriptor(
endpoint=DOMAIN_ENDPOINT,
index_descriptor=INDEX_DESCRIPTOR_COLLECTIONS,
auth=ES_AUTH
)
ES_BUFFER_COLLECTIONS=es_sink.flushing_buffer.flushing_buffer_factory(ES_DESCRIPTOR_COLLECTIONS,
flush_trigger=1000)

def send_all_domain_metric_values(values):
total = 0
Expand All @@ -287,25 +445,55 @@ def send_all_domain_metric_values(values):
total_flushed += f
total += 1

print('Added {} log lines to the buffer'.format(total))
print('Flushed {} log lines'.format(total_flushed))
print('Added {} log lines to the domain buffer'.format(total))
print('Flushed {} log lines for domains'.format(total_flushed))


def send_all_domain_metric_values_collections(vals_collections):
total = 0
total_flushed = 0
for value in vals_collections:

d = value._asdict()
line_value = d.pop('value')
metric_name = d['metric_name']
d[metric_name] = line_value

# Rename field timestamp to @timestamp
timestamp_value = d.pop('timestamp')
d['@timestamp'] = timestamp_value

log_line = json.dumps(d)

f, ignore = ES_BUFFER_COLLECTIONS.add_log_line(log_line)

total_flushed += f
total += 1

print('Added {} log lines to the collections buffer'.format(total))
print('Flushed {} log lines for collections'.format(total_flushed))
################################################################################
# Lambda handler
def handler(event, context):
doms = list_all_domains()
all_mets = get_all_domain_metric_descriptions(doms)
vals = get_all_domain_metric_values(all_mets)

colls = list_all_collections()
all_mets_collections = get_all_domain_metric_descriptions_collections(colls)
vals_collections = get_all_domain_metric_values_collections(all_mets_collections)

send_all_domain_metric_values(vals)
send_all_domain_metric_values_collections(vals_collections)

ES_BUFFER.flush()


################################################################################
# Command line/test interface
if __name__ == '__main__':
# This code will normally run as a lambda function, so I don't want to add a
# command-line arg. Instead, set an environment variable as if it were
# command-line arg. Instead, set an environment variable as if it were
# running as lambda.
print()
print('Did you remember to set the "TABLE" environment variable with the')
Expand Down Expand Up @@ -354,3 +542,5 @@ def print_all_vals(vals):
print('{}: {}'.format(dom, count))
total += count
print('Retrieved a total of {} values'.format(total))


Loading

0 comments on commit e728948

Please sign in to comment.