-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathvirustotal_check.py
160 lines (118 loc) · 5.37 KB
/
virustotal_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import hashlib
import json
import os
import sys
import time
from pathlib import Path
from pprint import pprint
from virustotal import Virustotal
API_KEY = os.environ.get('VIRUSTOTAL_API_KEY', None)
INSTALLER_FILE_SUFFIX = os.environ.get("INSTALLER_FILE_SUFFIX", "x86.exe")
WORKSPACE_DIR = os.environ.get('WORKSPACE_DIR', '.')
# A space-separated list of antiviruses that can produce false alarms. Example: "Antiy-AVL VBA32"
POSSIBLE_FALSE_ALARMS = os.environ.get('POSSIBLE_FALSE_ALARMS', '').split()
print(f'Antiviruses that can generate false alarms: {POSSIBLE_FALSE_ALARMS}')
# A threshold on the number of possible false alarms
# If an antivirus name not in the POSSIBLE_FALSE_ALARMS list, its alarm never considered as a false alarm
FALSE_ALARMS_THRESHOLD = os.environ.get('FALSE_ALARMS_THRESHOLD', '0')
try:
FALSE_ALARMS_THRESHOLD = int(FALSE_ALARMS_THRESHOLD)
except ValueError:
print(f'FALSE_ALARMS_THRESHOLD environment variable should contain a number. Got: {FALSE_ALARMS_THRESHOLD}')
sys.exit(1)
WAIT_TIME = 2 * 60 # 2 MINUTES
MAX_WAIT_COUNT = 10 # Wait 10 times -> 20 minutes max
def find_file(base_path, file_suffix_with_extension):
return list(base_path.glob('*' + file_suffix_with_extension))
def get_file_hash(filename):
BLOCK_SIZE = 65536
file_hash = hashlib.sha256()
with open(filename, 'rb') as f:
fb = f.read(BLOCK_SIZE)
while len(fb) > 0:
file_hash.update(fb)
fb = f.read(BLOCK_SIZE)
result = file_hash.hexdigest()
return result
def get_upload_url(vt_api):
upload_url_response = vt_api.request("files/upload_url", method="GET")
upload_url = upload_url_response.data
return upload_url
def upload_file(vt_api, upload_url, filename):
# Create dictionary containing the file to send for multipart encoding upload
files = {"file": (os.path.basename(filename), open(os.path.abspath(filename), "rb"))}
uploaded_response = vt_api.request(upload_url, files=files, method="POST")
upload_id = uploaded_response.json()['data']['id']
return upload_id
def get_file_analysis(vt_api, upload_id):
analysis_response = vt_api.request(f"analyses/{upload_id}")
pprint(analysis_response.data)
analysis_response_json = analysis_response.json()['data']
return analysis_response_json
def is_file_safe(analysis_json):
stats = analysis_json['attributes']['stats']
malicious: int = stats['malicious']
suspicious: int = stats['suspicious']
if not malicious and not suspicious:
return True
if malicious + suspicious > FALSE_ALARMS_THRESHOLD:
return False
false_alarms = []
for av_name, av_result in analysis_json['attributes']['results'].items():
if av_result['category'] in ('malicious', 'suspicious'):
if av_name in POSSIBLE_FALSE_ALARMS:
false_alarms.append(av_name)
else:
return False
if false_alarms:
print(f"False alarms: {', '.join(false_alarms)}")
return True
def write_analysis_result_to_file(analysis_json, filename, file_hash):
analysis_filename = filename + ".analysis.json"
print(f"Analysis results file: {analysis_filename}")
with open(analysis_filename, 'w') as file:
analysis_json['filename'] = filename
analysis_json['filehash'] = file_hash
file.write(json.dumps(analysis_json, indent=1))
def run_analysis(filename):
vt_api = Virustotal(API_KEY=API_KEY, API_VERSION="v3")
# Since the size of the file will be higher than 32MB, an upload url is required.
upload_url = get_upload_url(vt_api)
upload_id = upload_file(vt_api, upload_url, filename)
print(f"File upload id: {upload_id}")
file_hash = get_file_hash(filename)
print(f"VirusTotal Results URL: https://www.virustotal.com/gui/file/{file_hash}/detection")
# It takes time to complete the analysis, poll if analysis is complete.
num_waits = MAX_WAIT_COUNT
while num_waits > 0:
analysis_json = get_file_analysis(vt_api, upload_id)
analysis_status = analysis_json['attributes']['status']
if analysis_status == 'completed':
# write the analysis result to a file for reference
write_analysis_result_to_file(analysis_json, filename, file_hash)
# check if the file is safe based on the results
if is_file_safe(analysis_json):
print("Installer file is good to publish")
sys.exit(0)
else:
print("Malicious file!")
sys.exit(-1)
# If the analysis is not complete yet, wait
time.sleep(WAIT_TIME)
num_waits -= 1
# If still not finished, then simply fail the job
print("\nDid not receive the reports yet. Failing the job")
print(f"VirusTotal Results URL: https://www.virustotal.com/gui/file/{file_hash}/detection")
sys.exit(-1)
if __name__ == '__main__':
base_path = Path(WORKSPACE_DIR).resolve().absolute()
print(f"Workspace directory: {base_path}")
print(f"Installer file suffix: {INSTALLER_FILE_SUFFIX}")
installer_files = find_file(base_path, INSTALLER_FILE_SUFFIX)
if not installer_files:
print(f"No installer files found. Exiting...")
sys.exit(1)
for installer_file in installer_files:
installer_file_fullpath = str(installer_file.absolute())
print("Checking file:", installer_file_fullpath)
run_analysis(installer_file_fullpath)