-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmail_scanner.py
More file actions
333 lines (281 loc) · 13.3 KB
/
Copy pathmail_scanner.py
File metadata and controls
333 lines (281 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import imaplib
import email
from email.utils import parseaddr
from email.header import decode_header
import re
import datetime
import database
def clean_header(header_value):
"""Decodes email header characters robustly, supporting multi-part encodings."""
if not header_value:
return ""
try:
decoded_parts = decode_header(header_value)
header_text = []
for content, encoding in decoded_parts:
if isinstance(content, bytes):
try:
header_text.append(content.decode(encoding or "utf-8", errors="ignore"))
except Exception:
header_text.append(content.decode("latin-1", errors="ignore"))
else:
header_text.append(str(content))
return "".join(header_text).strip()
except Exception:
return str(header_value)
def parse_sender(from_header):
"""
Parses the 'From' header to extract name, email, and domain.
E.g. "Akshay Veeramalla <akshay@gmail.com>" -> ("Akshay Veeramalla", "akshay@gmail.com", "gmail.com")
"""
if not from_header:
return "Unknown", "unknown@unknown.com", "unknown.com"
decoded_from = clean_header(from_header)
name, email_address = parseaddr(decoded_from)
# Clean email address
email_address = email_address.strip().lower()
# Extrapolate domain
if "@" in email_address:
domain = email_address.split("@")[-1]
else:
domain = "unknown.com"
# If display name is empty, make email or domain the name
if not name:
name = email_address.split("@")[0] if "@" in email_address else "Unknown"
return name, email_address, domain
def find_all_mail_folder(mail):
"""
Dynamically scans folders on the Gmail server to find the designated 'All Mail' archive.
Utilizes IMAP folder attributes (\All) or falls back to common folder patterns.
"""
try:
status, folder_list = mail.list()
if status != "OK":
return "INBOX"
# 1. Search folder flags for \All
for folder_bytes in folder_list:
folder_str = folder_bytes.decode("utf-8", errors="ignore")
# Typical IMAP list response format: '(\\HasNoChildren \\All) "/" "[Gmail]/All Mail"'
if "\\All" in folder_str or "\\AllMail" in folder_str:
match = re.search(r'"([^"]+)"\s*$', folder_str)
if match:
return match.group(1)
# 2. Fallback to standard Gmail name structures
for folder_bytes in folder_list:
folder_str = folder_bytes.decode("utf-8", errors="ignore")
for name in ["[Gmail]/All Mail", "[Google Mail]/All Mail", "All Mail"]:
if name.lower() in folder_str.lower():
# Extract the exact string match
match = re.search(r'"([^"]+)"\s*$', folder_str)
if match:
return match.group(1)
return name
except Exception as e:
print(f"Error listing folders: {e}")
return "INBOX"
def connect_gmail(email_address, app_password):
"""Establishes an SSL-secured IMAP connection to Gmail."""
try:
mail = imaplib.IMAP4_SSL("imap.gmail.com", 993)
mail.login(email_address, app_password)
return mail
except Exception as e:
raise Exception(f"Failed to authenticate with Gmail: {str(e)}")
def scan_gmail_metadata(email_address, app_password, progress_callback=None):
"""
Crawls Gmail securely and indexes new email metadata to the local database.
Sends progress reports via progress_callback.
"""
database.init_db()
if progress_callback:
progress_callback("Connecting to Gmail secure servers...")
try:
mail = connect_gmail(email_address, app_password)
except Exception as e:
if progress_callback:
progress_callback(f"Connection Error: {str(e)}")
raise e
try:
# Dynamically discover Gmail's All Mail folder
all_mail_folder = find_all_mail_folder(mail)
if progress_callback:
progress_callback(f"Accessing mail repository folder: '{all_mail_folder}'...")
# Select folder in read-only mode so scanning doesn't affect read/unread states
status, data = mail.select(f'"{all_mail_folder}"', readonly=True)
if status != "OK":
if progress_callback:
progress_callback(f"Could not open '{all_mail_folder}', falling back to INBOX...")
mail.select("INBOX", readonly=True)
# Fetch all email UIDs on the server
if progress_callback:
progress_callback("Retrieving complete list of email indices from Gmail...")
status, uids_data = mail.uid("search", None, "ALL")
if status != "OK" or not uids_data[0]:
if progress_callback:
progress_callback("Your inbox is empty!")
mail.logout()
return
server_uids = [int(x) for x in uids_data[0].split()]
total_emails = len(server_uids)
if progress_callback:
progress_callback(f"Server contains {total_emails:,} emails.")
# Fetch UIDs already stored in the local SQLite database
existing_emails = database.get_all_emails()
existing_uids = {e["uid"] for e in existing_emails}
# Determine the delta (emails on server not yet in local index)
new_uids = [uid for uid in server_uids if uid not in existing_uids]
if not new_uids:
if progress_callback:
progress_callback("Local SQLite index is already fully up-to-date!")
mail.logout()
return
new_count = len(new_uids)
if progress_callback:
progress_callback(f"Syncing {new_count:,} new emails to local SQLite index...")
# Batch fetch metadata (From, Subject, Date, Size) in chunks of 1000 for maximum speed
batch_size = 1000
completed = 0
for i in range(0, new_count, batch_size):
chunk = new_uids[i:i+batch_size]
chunk_strs = ",".join(str(uid) for uid in chunk)
# Fetch size and headers in a single fast network call (BODY.PEEK avoids marking read)
status, fetch_data = mail.uid("fetch", chunk_strs, "(RFC822.SIZE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM DATE)])")
if status != "OK":
continue
email_records = {}
current_uid = None
# Parse response parts
for part in fetch_data:
if isinstance(part, tuple):
header_meta = part[0].decode("utf-8", errors="ignore")
# Parse the UID and Size from header metadata
uid_match = re.search(r'UID\s+(\d+)', header_meta, re.IGNORECASE)
size_match = re.search(r'LIMIT\s+(\d+)|SIZE\s+(\d+)', header_meta, re.IGNORECASE)
if uid_match:
current_uid = int(uid_match.group(1))
size = int(size_match.group(2) or size_match.group(1)) if size_match else 0
# Initialize record
email_records[current_uid] = {
"size": size,
"subject": "",
"from": "",
"date": ""
}
if current_uid and len(part) > 1:
msg_data = part[1]
if isinstance(msg_data, bytes):
try:
msg = email.message_from_bytes(msg_data)
email_records[current_uid]["subject"] = msg.get("Subject", "")
email_records[current_uid]["from"] = msg.get("From", "")
email_records[current_uid]["date"] = msg.get("Date", "")
except Exception:
pass
# Save parsed records to SQLite database
for uid, record in email_records.items():
name, email_addr, domain = parse_sender(record["from"])
subj = clean_header(record["subject"])
date_val = record["date"]
# Check smart cache to apply categories immediately during the crawl
cached_sender = database.get_cached_sender(email_addr)
category = cached_sender["category"] if cached_sender else None
keywords = cached_sender["keywords"] if cached_sender else None
database.save_email(
uid=uid,
sender_name=name,
sender_email=email_addr,
sender_domain=domain,
subject=subj,
date_sent=date_val,
size=record["size"],
category=category,
keywords=keywords
)
completed += len(chunk)
if progress_callback:
progress_callback(f"Crawler Progress: Indexed {completed:,}/{new_count:,} emails ({int(completed/new_count*100)}%)...")
if progress_callback:
progress_callback("Mail indexing completed successfully!")
except Exception as e:
if progress_callback:
progress_callback(f"Scanning stopped due to error: {str(e)}")
raise e
finally:
try:
mail.logout()
except Exception:
pass
def bulk_delete_emails(email_address, app_password, uids, progress_callback=None):
"""
Connects to Gmail securely, flags the selected email UIDs as \Deleted,
performs an IMAP EXPUNGE, and synchronized the local SQLite database.
"""
if not uids:
if progress_callback:
progress_callback("No emails selected for deletion.")
return
total_to_delete = len(uids)
if progress_callback:
progress_callback(f"Connecting to Gmail to permanently delete {total_to_delete:,} emails...")
try:
mail = connect_gmail(email_address, app_password)
except Exception as e:
if progress_callback:
progress_callback(f"Connection Error: {str(e)}")
raise e
try:
all_mail_folder = find_all_mail_folder(mail)
# Select folder in read-write mode (readonly=False) to allow edits
status, data = mail.select(f'"{all_mail_folder}"', readonly=False)
if status != "OK":
mail.select("INBOX", readonly=False)
# Gmail IMAP deletion model:
# To delete an email on Gmail using IMAP, we must copy it to the Trash folder
# or flag it as \Deleted depending on settings. Moving to trash is the cleanest
# because it removes it from All Mail immediately.
# Let's check if the Trash folder exists.
trash_folder = None
status, folders = mail.list()
if status == "OK":
for f in folders:
f_str = f.decode("utf-8", errors="ignore")
if "\\Trash" in f_str or "trash" in f_str.lower():
match = re.search(r'"([^"]+)"\s*$', f_str)
if match:
trash_folder = match.group(1)
break
batch_size = 500 # Process in sub-batches to prevent hitting IMAP string length limits
deleted_count = 0
for i in range(0, total_to_delete, batch_size):
chunk = uids[i:i+batch_size]
chunk_strs = ",".join(str(uid) for uid in chunk)
if trash_folder:
# Move to Trash (recommended Gmail behavior)
mail.uid("COPY", chunk_strs, f'"{trash_folder}"')
# Flag as deleted in original folder
mail.uid("STORE", chunk_strs, "+FLAGS", "\\Deleted")
else:
# Fallback: flag as deleted directly
mail.uid("STORE", chunk_strs, "+FLAGS", "\\Deleted")
deleted_count += len(chunk)
if progress_callback:
progress_callback(f"Server Deletion: Moved {deleted_count:,}/{total_to_delete:,} emails to Trash...")
# Commit deletions on server
if progress_callback:
progress_callback("Expunging server to permanently release storage...")
mail.expunge()
# Remove from local SQLite database index
if progress_callback:
progress_callback("Synchronizing local SQLite database index...")
database.delete_emails_from_db(uids)
if progress_callback:
progress_callback(f"Successfully deleted {total_to_delete:,} emails from both Gmail and SQLite database!")
except Exception as e:
if progress_callback:
progress_callback(f"Deletion failed: {str(e)}")
raise e
finally:
try:
mail.logout()
except Exception:
pass