-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshutdown_monitor.py
49 lines (43 loc) · 1.87 KB
/
shutdown_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio
import platform
import signal
import logging
class ShutdownMonitor:
def __init__(self, discord_handler):
self.discord_handler = discord_handler
self.platform = platform.system()
self.loop = asyncio.get_event_loop()
async def start(self):
if self.platform == 'Linux':
# Handle SIGTERM (system shutdown) and SIGINT (Ctrl+C)
self.loop.add_signal_handler(signal.SIGTERM, self.handle_shutdown)
self.loop.add_signal_handler(signal.SIGINT, self.handle_shutdown)
logging.info("Shutdown monitor is running.")
# Keep the coroutine running
await asyncio.Future()
elif self.platform == 'Windows':
# For Ctrl+C handling on Windows
import threading
def windows_ctrl_c_handler():
import msvcrt
while True:
if msvcrt.kbhit():
key = msvcrt.getch()
if key == b'\x03': # Ctrl+C
self.loop.call_soon_threadsafe(self.handle_shutdown)
break
threading.Thread(target=windows_ctrl_c_handler, daemon=True).start()
logging.info("Shutdown monitor is running (Windows Ctrl+C handler).")
# Keep the coroutine running
await asyncio.Future()
else:
logging.error(f"Unsupported platform: {self.platform}")
raise Exception(f"Unsupported platform: {self.platform}")
def handle_shutdown(self):
logging.info("Shutdown detected.")
asyncio.create_task(self.shutdown_sequence())
async def shutdown_sequence(self):
message = "Application is shutting down."
await self.discord_handler.send_message(message)
# Close the Discord client gracefully
await self.discord_handler.client.close()