-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimap_client.py
More file actions
207 lines (177 loc) · 7.08 KB
/
Copy pathimap_client.py
File metadata and controls
207 lines (177 loc) · 7.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""IMAP client logic for Universal Mail Cleaner.
Contains ImapService (IMAP connection/operations) and decode_header_str helper.
"""
import imaplib
import email
import email.header
import logging
from datetime import datetime, timedelta
from typing import Callable, Optional
from models import MailAccount, CleanRule
try:
import keyring
KEYRING_AVAIL = True
except ImportError:
KEYRING_AVAIL = False
APP_NAME = "MailCleaner_V8_Universal"
logger = logging.getLogger(APP_NAME)
def decode_header_str(header_val) -> str:
"""Decodes an email header value, handling various encodings.
Args:
header_val: Raw header value (str or bytes).
Returns:
Decoded header string, or a fallback string on error.
"""
if not header_val:
return "(No Subject)"
try:
decoded_list = email.header.decode_header(header_val)
val = ""
for text, encoding in decoded_list:
if isinstance(text, bytes):
if encoding:
val += text.decode(encoding, errors='ignore')
else:
val += text.decode('utf-8', errors='ignore')
else:
val += str(text)
return val
except (UnicodeDecodeError, LookupError, AttributeError) as e:
logger.debug(f"Header decoding error: {e}")
return str(header_val) if header_val else "(Decoding error)"
class ImapService:
"""IMAP service for email operations.
Manages IMAP4_SSL connections to email accounts and provides
functions for searching, deleting, and trash management.
"""
def __init__(self, log_func: Callable[[str], None]) -> None:
"""Initializes the IMAP service.
Args:
log_func: Callback function for log output (e.g. Signal.emit).
"""
self.log = log_func
self.conn = None
self.current_account = None
def connect(self, account: MailAccount) -> bool:
"""Establishes an IMAP4_SSL connection to an account.
Args:
account: MailAccount object with host, port, and user.
Returns:
True on successful connection, False on error.
"""
self.current_account = account
pwd = ""
if KEYRING_AVAIL:
try:
pwd = keyring.get_password(APP_NAME, account.name)
except Exception as e:
logger.warning(f"Keyring read error for {account.name}: {e}")
if not pwd:
self.log(f"❌ No password found for {account.name}.")
return False
try:
self.log(f"🔌 Connecting to {account.host}...")
self.conn = imaplib.IMAP4_SSL(account.host, account.port, timeout=30)
self.conn.login(account.user, pwd)
self.log(f"✅ Logged in as {account.user}")
return True
except Exception as e:
self.log(f"❌ Login error ({account.name}): {e}")
return False
def disconnect(self) -> None:
"""Closes the IMAP connection cleanly."""
if self.conn:
try:
self.conn.logout()
except Exception as e:
logger.warning(f"IMAP logout error: {e}")
self.conn = None
def find_trash_folder(self) -> str:
"""Tries to detect the trash folder.
Returns:
Name of the trash folder (fallback: "Trash").
"""
if self.current_account.trash_folder:
return self.current_account.trash_folder
candidates = ["Trash", "Papierkorb", "Deleted Items",
"Gelöschte Elemente", "INBOX.Trash", "Bin"]
try:
status, folders = self.conn.list()
if status != 'OK':
return "Trash"
folder_names = []
for f in folders:
if not f:
continue
decoded = f.decode("utf-8", errors="replace") if isinstance(f, bytes) else f
stripped = decoded.strip()
if stripped.endswith('"'):
open_q = stripped.rfind('"', 0, len(stripped) - 1)
name = stripped[open_q + 1:-1]
else:
name = stripped.rsplit(' ', 1)[-1]
if name:
folder_names.append(name)
for c in candidates:
for f in folder_names:
if c.lower() == f.lower() or f.endswith(f"/{c}"):
self.log(f"🗑️ Trash folder detected: {f}")
return f
return "Trash"
except Exception as e:
logger.warning(f"find_trash_folder error: {e}")
return "Trash"
def list_folders(self) -> list:
"""Lists all available IMAP folders for the connected account.
Returns:
List of folder name strings, or ['INBOX'] on error.
"""
if not self.conn:
return ["INBOX"]
try:
status, raw_folders = self.conn.list()
if status != "OK":
return ["INBOX"]
folders = []
for entry in raw_folders:
if not entry:
continue
decoded = entry.decode("utf-8", errors="replace") if isinstance(entry, bytes) else entry
# Format: (\HasNoChildren) "/" "INBOX" or (\HasNoChildren) "/" INBOX
# Quoted names (e.g. "Sent Items") must not be split on the inner space.
stripped = decoded.strip()
if stripped.endswith('"'):
open_q = stripped.rfind('"', 0, len(stripped) - 1)
name = stripped[open_q + 1:-1]
else:
name = stripped.rsplit(' ', 1)[-1]
if name:
folders.append(name)
return folders if folders else ["INBOX"]
except Exception as e:
logger.warning(f"list_folders error: {e}")
return ["INBOX"]
def get_search_criteria(self, rule: CleanRule) -> Optional[str]:
"""Translates a CleanRule into an IMAP search command string.
Args:
rule: CleanRule object with filter_type and value.
Returns:
IMAP search string or None on error.
"""
try:
if rule.filter_type == "older_than_days":
days = int(rule.value)
cutoff = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
return f'(BEFORE "{cutoff}")'
elif rule.filter_type == "sender":
safe_value = rule.value.replace('\\', '\\\\').replace('"', '')
return f'(FROM "{safe_value}")'
elif rule.filter_type == "subject":
safe_value = rule.value.replace('\\', '\\\\').replace('"', '')
return f'(SUBJECT "{safe_value}")'
elif rule.filter_type == "size_mb":
bytes_val = int(float(rule.value) * 1024 * 1024)
return f'(LARGER {bytes_val})'
except Exception as e:
logger.warning(f"get_search_criteria error for rule '{rule.name}': {e}")
return None