-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaction_monitor.py
More file actions
78 lines (58 loc) · 2.13 KB
/
action_monitor.py
File metadata and controls
78 lines (58 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import time
import logging
import requests
import argparse
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# URL of the remote server
url = None
def report_file(path: str):
"""Sends file content to the designated server"""
try:
with open(path, "rb") as f:
requests.post(url, files={"upload_file": f})
except:
logging.warning(f"Error opening and sending the modified file: {path}")
class ActionMonitorHandler(FileSystemEventHandler):
"""Logs and sends all shell files in the chosen directory."""
def __init__(self, logger=None):
super().__init__()
def on_moved(self, event):
super().on_moved(event)
def on_created(self, event):
super().on_created(event)
def on_deleted(self, event):
super().on_deleted(event)
def on_modified(self, event):
super().on_modified(event)
what = "directory" if event.is_directory else "file"
logging.info("Modified %s: %s", what, event.src_path)
if event.src_path.endswith(".sh"):
logging.info("About to send the file contents")
report_file(event.src_path)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--directory", required=True, help="Directory to monitor")
parser.add_argument("-u", "--url", required=True, help="URL of the remote server")
args = parser.parse_args()
url = args.url
if not os.path.exists(args.directory) or not os.path.isdir(args.directory):
logging.error("The given path doesn't exist, or isn't a directory")
exit(1)
logging.info(f"Starting to monitor the directory: {args.directory}")
event_handler = ActionMonitorHandler()
observer = Observer()
observer.schedule(event_handler, args.directory, recursive=False)
observer.start()
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
observer.stop()
observer.join()