-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathterraforminator.py
119 lines (102 loc) · 4.42 KB
/
terraforminator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
from tabulate import tabulate
from datetime import datetime, date
import argparse
import asyncio
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource.resources.v2022_09_01 import ResourceManagementClient
from azure.core.exceptions import ResourceNotFoundError
from resource_graph_query import run_azure_rg_query
async def list_resource_groups_with_temporary_tag(subscription_id: str):
"""
List the resource groups in authenticated subscriptions with Temporary tag value as TRUE
:return:
"""
# load_dotenv()
credential = DefaultAzureCredential()
resource_management_client = ResourceManagementClient(subscription_id=subscription_id, credential=credential)
tag_filter= f"tagName eq 'Temporary'"
all_rgs_filtered = resource_management_client.resource_groups.list(filter=tag_filter)
rgs_to_deleted = []
# Programmatically filter resource groups for tagValue in a case-insensitive manner
case_insensitive_rgs = [
rg for rg in all_rgs_filtered
if rg.tags and 'Temporary' in rg.tags and rg.tags['Temporary'].lower() == 'true'
]
for rg in case_insensitive_rgs:
rg_dict = {
'name': rg.name,
'location': rg.location
}
rgs_to_deleted.append(rg_dict) # final dictionary of rgs to be deleted with Temporary tag value as TRUE
print(rgs_to_deleted)
return rgs_to_deleted
async def delete_resource_groups(subscription_id: str, rgs_to_be_deleted: list[dict]):
"""
Delete the resource groups with Temporary tag value as TRUE
:return:
"""
credential = DefaultAzureCredential()
resource_management_client = ResourceManagementClient(subscription_id=subscription_id, credential=credential)
for rg in rgs_to_be_deleted:
try:
print(f"Deleting {rg['name']} from {subscription_id}")
resource_management_client.resource_groups.begin_delete(resource_group_name=rg['name']).result()
print(f"Successfully deleted {rg['name']}")
except ResourceNotFoundError:
print(f"Resource group '{rg['name']}' not found.")
except Exception as e:
print(f"Failed to delete resource group '{rg['name']}': {e}")
# Optional: Add a short delay between deletions to prevent overwhelming the service
await asyncio.sleep(1)
def list_resources_in_rg(subscription_id:str, rgs_to_be_deleted: list[dict]):
"""
get the list of resources inside an RG
:param rgs_to_be_deleted:
:return:
"""
credential = DefaultAzureCredential()
resource_management_client = ResourceManagementClient(subscription_id=subscription_id, credential=credential)
details_to_display = []
for rg in rgs_to_be_deleted:
try:
resource_list = resource_management_client.resources.list_by_resource_group(resource_group_name=rg['name'])
for resources in resource_list:
resource = {
'name' : resources.name,
'resource_id' : resources.id,
'resource_type' : resources.type,
'resource_group' : rg['name']
}
details_to_display.append(resource)
except Exception as e:
print(f"Failed to reteive resources from resource group '{rg['name']}': {e}")
return details_to_display
async def main():
"""To test the code"""
start_time = datetime.utcnow() # Get start time in UTC
print(f"Process started at (UTC): {start_time}")
load_dotenv()
parser = argparse.ArgumentParser("Decommission nolonger used resource groups in Azure.")
parser.add_argument("--subscription_name", type=str, required=True,help="Azure subscription name")
args = parser.parse_args()
subscription_name = args.subscription_name
subscription_id = run_azure_rg_query(subscription_name=subscription_name)
rgs_to_deleted = await list_resource_groups_with_temporary_tag(subscription_id=subscription_id)
details_to_dispaly = list_resources_in_rg(subscription_id=subscription_id, rgs_to_be_deleted=rgs_to_deleted)
await delete_resource_groups(subscription_id=subscription_id, rgs_to_be_deleted=rgs_to_deleted)
print(f"The below resources are decommisioned on {date.today()}")
# Extracting headers and rows
headers = ["Name", "Type", "ID", "Resource Group Name"]
rows = [[item["name"], item["resource_type"], item["resource_id"], item["resource_group"]] for item in
details_to_dispaly]
# Printing in tabular format
print(tabulate(rows, headers=headers, tablefmt="grid"))
end_time = datetime.utcnow() # Get end time in UTC
print(f"Process completed at (UTC): {end_time}")
# Calculate and print elapsed time
elapsed_time = end_time - start_time
print(f"Total elapsed time: {elapsed_time} (hh:mm:ss)")
if __name__ == "__main__":
asyncio.run(main())