-
-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathmain.py
More file actions
3308 lines (2655 loc) · 144 KB
/
main.py
File metadata and controls
3308 lines (2655 loc) · 144 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Copyright (C) 2023-2026 Johannes Habel
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Contact:
E-Mail: EchterAlsFake@proton.me
Discord: echteralsfake (faster response)
"""
import configparser
import logging
# Stop Splash Screen
import os
import sys
import tempfile
import threading
import httpx
FORCE_TEST_RUN = False
if "NUITKA_ONEFILE_PARENT" in os.environ:
splash_filename = os.path.join(
tempfile.gettempdir(),
f"onefile_{int(os.environ['NUITKA_ONEFILE_PARENT'])}_splash_feedback.tmp"
)
if os.path.exists(splash_filename):
os.unlink(splash_filename)
from PySide6.QtWidgets import QApplication
app = QApplication(sys.argv)
# macOS Setup...
if sys.platform == "darwin" and not FORCE_TEST_RUN:
from src.backend.macos_setup import macos_setup, SparkleUpdater
macos_setup()
# Necessary imports for splashscreen
import src.frontend.UI.resources
from PySide6.QtGui import QPixmap
from src.frontend.UI.splashscreen import ModernSplashScreen
splash_pixmap = QPixmap(":/images/graphics/splashscreen.png")
splash = ModernSplashScreen(splash_pixmap)
splash.show()
splash.showMessage("Importing (General).")
app.processEvents()
# General imports
import time
import os.path
import argparse
import markdown
import tempfile
import webbrowser
import subprocess
from typing import Callable, Iterable
from collections import deque
from threading import Event, Lock
from itertools import islice, chain
splash.showMessage("Importing (Backend).")
app.processEvents()
# Backend imports
from src.backend import clients # Singleton instance for the client objects (really important)
from src.backend.database import *
from src.backend.shared_gui import *
from src.backend.shared_functions import *
from src.backend.helper_functions import *
from src.backend.clients import VideoAttributes
from src.backend.check_license import LicenseManager
import src.backend.shared_functions as shared_functions
from src.backend.config import __version__, PUBLIC_KEY_B64, shared_config, IS_SOURCE_RUN, TEMP_DIRECTORY, TEMP_DIRECTORY_STATES, TEMP_DIRECTORY_SEGMENTS
splash.showMessage("Importing (Frontend).")
app.processEvents()
# Frontend imports
from src.frontend.UI.theme import *
from src.frontend.UI.ssl_warning import *
from src.frontend.translations.strings import *
from src.frontend.UI.license import License, Disclaimer
from src.frontend.UI.thumbnail_viewer import ImageViewer
from src.frontend.UI.ui_form_main_window import Ui_PornFetch_UI
from src.frontend.UI.pornfetch_info_dialog import PornFetchInfoWidget
from src.frontend.UI.custom_combo_box import ComboPopupFitter, make_quality_combobox
splash.showMessage("Importing (PySide6 - FULL).")
app.processEvents()
# Qt / PySide6 related imports
from PySide6.QtGui import QIcon, QFontDatabase, QPixmap, QShortcut, QKeySequence
from PySide6.QtCore import (QTextStream, QRunnable, QThreadPool, QSemaphore, Qt, QLocale, QSize,
QTranslator, QCoreApplication, QStandardPaths, QSettings, Slot)
from PySide6.QtWidgets import (QTreeWidgetItem, QButtonGroup, QFileDialog, QHeaderView, QComboBox, QLabel,
QInputDialog, QMainWindow, QProgressBar, QVBoxLayout, QSizePolicy, QLayout)
splash.showMessage("Importing (APIs).")
app.processEvents()
# Errors from different APIs
from phub import errors as ph_errors
from xnxx_api.modules.errors import InvalidResponse
from phub.errors import VideoError as VideoError_PH
from base_api.modules.errors import ProxySSLError, InvalidProxy
from xvideos_api.modules.errors import (VideoUnavailable as VideoUnavailable_XV)
from eporner_api.modules.errors import NotAvailable as NotAvailable_EP, VideoDisabled as VideoDisabled_EP
from youporn_api.modules.errors import VideoUnavailable as VideoUnavailable_YP, RegionBlocked as RegionBlocked_YP
from hqporner_api.modules.errors import InvalidActress as InvalidActress_HQ, NoVideosFound, NotAvailable as NotAvailable_HQ, WeirdError as WeirdError_HQ
# Other
from hqporner_api.api import Sort as hq_Sort
from eporner_api.modules.locals import Category as ep_Category
splash.showMessage("Importing (AV - FFMPEG).")
app.processEvents()
try:
from av import open as av_open # Don't ask
from av.audio.resampler import AudioResampler # Don't ask
FORCE_DISABLE_AV = False
except Exception:
FORCE_DISABLE_AV = True
FORCE_PORTABLE_RUN: bool = False # Holds a value for argparse later (see main function)
total_segments: int = 0 # Total segments kept in a queue (for total progress tracking)
downloaded_segments: int = 0 # Amount of segments that have been downloaded (for total progress tracking)
total_downloaded_videos: int = 0 # All videos that actually successfully downloaded
session_urls: list = [] # This list saves all URls used in the current session. Used for the URL export function (CTRL + E)
conf: configparser.ConfigParser = shared_config # Holds the configuration instance (converted to QSettings INI format)
stop_flag: threading.Event = Event() # Stops loading videos into the tree widget (does not stop any downloads)
_download_lock: threading.Lock = Lock() # I actually don't really know why this is here
video_data: clients.VideoData = clients.VideoData() # Stores general video data as long as the data for each loaded video
settings: QSettings = QSettings() # Global instance of the settings used in Porn Fetch
logger = shared_functions.setup_logger("Porn Fetch - [MAIN]", log_file="PornFetch.log", level=logging.DEBUG)
license_storage_path: str = os.path.join(QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppConfigLocation), "pornfetch.license")
x: bool = False # Don't ask (this is a secret ;)
def _resolve_config_path(portable: bool, portable_dir: str | None = None) -> Path:
"""
Portable -> ./config.ini (portable_dir or CWD)
Installed -> config.ini in AppConfigLocation (per-user standard location)
"""
if portable:
base = Path(portable_dir) if portable_dir else Path.cwd()
return base / "config.ini"
cfg_dir = Path(QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppConfigLocation))
QDir().mkpath(str(cfg_dir))
return cfg_dir / "config.ini"
# We need this, because the configuration path is different whether used portably or if installed.
def make_settings(portable: bool, portable_dir: str | None = None) -> QSettings:
"""
Returns QSettings bound to a real INI file (portable or installed).
Also ensures the INI exists and contains merged defaults (no overwriting user values).
This is needed, because the CLI can't use QSettings, but I want to use it in the GUI, so we keep both
compatible.
"""
ini_path = _resolve_config_path(portable, portable_dir)
ensure_config_file(ini_path)
return QSettings(str(ini_path), QSettings.Format.IniFormat)
# This ensures the configuration fil exists, will get the current path and convert the ini format
# Into an actual QSettings object that I can work with.
# I wouldn't really need to use QSettings here, but if Qt provides a method, why not use it
class LicenseWidget(QWidget):
"""
This is the License Widget which will let a user import the actual license and see if it's valid
Still in experimental / beta mode, will be improved in v3.9 # TODO
"""
def __init__(self, setup_restrictions: Callable, parent=None) -> None:
super().__init__(parent)
self.setup_restrictions = setup_restrictions # A function that basically creates the restrictions
self.lic = LicenseManager(
public_key_b64=PUBLIC_KEY_B64, # Public License to check the generated key
storage_path=default_license_path(), # Storage path for the license
expected_product="porn-fetch", # Yeah this is just an identifier, doesn't really matter
)
self.status = QLabel()
self.btn_import = QPushButton("Import license…")
self.btn_import.clicked.connect(self.import_license)
layout = QVBoxLayout(self)
layout.addWidget(self.status)
layout.addWidget(self.btn_import)
self.refresh_status()
def refresh_status(self) -> None:
res = self.lic.load_installed() # Checks if a license has already been installed before
self.status.setText(f"License status: {'✅ Valid' if res.valid else '❌ Not valid'}\n{res.reason}")
self.setup_restrictions() # Enforces the restrictions
def import_license(self) -> None:
# Opens a Dialog for importing the actual license
path, _ = QFileDialog.getOpenFileName(self, "Select license file", "", "License (*.license);;All files (*)")
if not path:
return # User aborted (probably)
res = self.lic.install_from_file(Path(path))
QMessageBox.information(self, "License", res.reason)
self.refresh_status()
class InstallThread(QRunnable):
def __init__(self, app_name: str, app_id: str = "pornfetch", org_name: str = "EchterAlsFake",
portable_config_path: str | None = None) -> None:
super().__init__()
"""
This function installs Porn Fetch for Windows and Linux based systems.
macOS has its own method and is not handled here.
The installation works as simple as we take the install directory from a Qt provided default path, on
Linux typically somewhere in the user directory, and on Windows in the APPDATA directory.
We will take the main executable of Porn Fetch + settings, write those into the new directory
and write a desktop entry file / Shortcut with the executable path + Logo,
so that the user can run Porn Fetch from their start menu.
"""
global settings
settings: QSettings = make_settings(portable=False) # At the first run, I assume the user goes for a portable install-type, however if the installation is called we need to switch that behaviour
self.app_name: str = app_name # Custom app name, otherwise 'Porn Fetch'
self.app_id: str = app_id # used for desktop file name, etc.
self.org_name: str = org_name # All handled in config.py
self.portable_config_path: str = portable_config_path
self.signals: Signals = Signals() # Signals for error / progress reporting
# keep your logger setup if you want; using basic logging here
self.logger = setup_logger(name="Porn Fetch - [InstallThread]", level=logging.DEBUG)
def run(self):
settings.setValue("Misc/app_name", self.app_name) # Sets app name, because we need that later in PF
try:
self.signals.start_undefined_range.emit() # Starts a loading animation until we have more information
# These matter for QSettings() “installed” mode:
QCoreApplication.setOrganizationName(self.org_name)
QCoreApplication.setApplicationName(self.app_name)
if sys.platform.startswith("linux"):
self._install_linux_user() # Starts Linux install
elif sys.platform == "win32":
self._install_windows_user() # Starts Windows install
else:
raise RuntimeError(f"Unsupported platform: {sys.platform}") # lol
except Exception: # Some error happened during installation
error = traceback.format_exc()
self.logger.error(error) # Log the error
self.signals.install_finished.emit([False, error]) # Report the error (shows GUI message)
self.signals.stop_undefined_range.emit() # Stop loading animation
return
self.signals.stop_undefined_range.emit() # Stop loading animation
self.signals.install_finished.emit([True, ""]) # Successful install :)
def _migrate_portable_settings(self, install_dir: str) -> None:
"""
Copy current portable config.ini into the installed working directory
so the installed run keeps user settings.
"""
try:
src = self.portable_config_path
if not src:
src = str(_resolve_config_path(portable=True, portable_dir=os.getcwd()))
dst = str(_resolve_config_path(portable=True, portable_dir=install_dir))
if src and os.path.exists(src):
copy_overwrite_atomic(src, dst)
self.logger.info(f"Migrated settings: {src} -> {dst}")
else:
self.logger.warning(f"No portable config found at {src}; creating defaults at {dst}")
ensure_config_file(Path(dst))
s = QSettings(dst, QSettings.Format.IniFormat)
s.setValue("Misc/install_type", "installed")
s.setValue("Misc/app_name", self.app_name)
s.sync()
except Exception as e:
self.logger.warning(f"Settings migration failed: {e}")
# ----------------------------
# Linux (user-local install)
# ----------------------------
def _install_linux_user(self) -> None:
filename = "PornFetch_Linux_GUI_x64.bin" # Typical filename, but needs to be improved # TODO
if os.path.exists("PornFetch_Windows_GUI_arm64.bin"):
filename = "PornFetch_Linux_GUI_arm64.bin" # For ARM based CPUs
# Install “payload” (binary + assets) into local app data:
install_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppLocalDataLocation)
mkpath(install_dir)
# We use the provided path by Qt as this is the most stable option for this
apps_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.ApplicationsLocation)
if not apps_dir:
# Rare, but some environments can return empty; keep a safe fallback.
apps_dir = os.path.expanduser("~/.local/share/applications")
mkpath(apps_dir)
src_exe = get_original_executable_path() # Gets source executable path
dst_exe = os.path.join(install_dir, filename) # Creates the final path for the destination executable
try:
copy_overwrite_atomic(src=src_exe, dst=dst_exe) # Copies the actual file
except RuntimeError:
self.signals.error_signal.emit(f"""
A Runtime error occurred during the installation process. This typically occurs, because I couldn't
find the real path of the extracted file from the main application.
In short: You can't fix that, please report this!""")
chmod_755(dst_exe) # Grant execute permission (should be something like chmod +x I guess)
icon_dst = os.path.join(install_dir, "logo.png") # Creates the destination for the logo
if not QFile.exists(icon_dst):
QFile.copy(":/images/graphics/logo.png", icon_dst) # We get the logo directly from the embedded resources
# On Linux .png files are typically fine, on Windows this is handled differently
self._migrate_portable_settings(install_dir)
# Write desktop file atomically
desktop_path = os.path.join(apps_dir, f"{self.app_id}.desktop")
entry = f"""[Desktop Entry]
Version={__version__}
Type=Application
Name={self.app_name}
Exec="{dst_exe}" %F
Icon={icon_dst}
Path={install_dir}
Terminal=false
Categories=Utility;
StartupNotify=true
"""
write_text_atomic(desktop_path, entry) # Creates the desktop entry for running PF from start menu
# Store “installed” flag using Qt settings
settings = make_settings(portable=False)
settings.setValue("Misc/install_type", "installed")
settings.sync()
self.logger.info(f"Installed to {install_dir}, desktop entry {desktop_path}")
# ----------------------------
# Windows (user-local install)
# ----------------------------
def _install_windows_user(self) -> None:
import win32com.client # pywin32; Only available on Windows systems
filename = "PornFetch_Windows_GUI_x64.exe" # Needs to be improved # TODO
if os.path.exists("PornFetch_Windows_GUI_arm64.bin"):
filename = "PornFetch_Windows_GUI_arm64.bin" # For ARM based CPUs
# Every comment that hasn't been done here, see Linux install as it's the same
install_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppLocalDataLocation)
mkpath(install_dir)
src_exe = filename
if not os.path.exists(src_exe):
raise RuntimeError(f"Missing installer payload: {src_exe}")
dst_exe = os.path.join(install_dir, filename)
move_or_copy(src_exe, dst_exe)
self._migrate_portable_settings(install_dir)
# Settings flag
settings = make_settings(portable=False)
settings.setValue("Misc/install_type", "installed")
settings.sync()
# Start Menu Programs folder via Qt
start_menu_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.ApplicationsLocation)
if not start_menu_dir:
# Fallback (usually not needed)
start_menu_dir = os.path.join(os.getenv("APPDATA"), "Microsoft", "Windows", "Start Menu", "Programs")
mkpath(start_menu_dir)
shortcut_path = os.path.join(start_menu_dir, f"{self.app_name}.lnk")
shell = win32com.client.Dispatch("WScript.Shell")
shortcut = shell.CreateShortcut(shortcut_path)
shortcut.TargetPath = dst_exe
shortcut.WorkingDirectory = install_dir
shortcut.IconLocation = dst_exe
shortcut.Save()
self.logger.info(f"Installed to {install_dir}, shortcut {shortcut_path}")
class UninstallThread(QRunnable):
"""
Uninstalls the *user-local installed* version:
- Linux: removes ~/.local/share/applications/<app_id>.desktop
removes AppLocalDataLocation payload folder (binary/icon)
- Windows: removes Start Menu shortcut
removes AppLocalDataLocation payload folder
uses a .bat helper to delete after app exit (because Windows locks running exe)
"""
def __init__(self, app_id: str = "pornfetch", org_name: str = "EchterAlsFake") -> None:
super().__init__()
global settings
settings: QSettings = make_settings(portable=False)
self.app_name: str = settings.value("Misc/app_name")
self.app_id: str = app_id
self.org_name: str = org_name
self.signals: Signals = Signals()
self.logger = setup_logger(name="PornFetch - [UninstallThread]", level=logging.DEBUG)
def run(self):
try:
self.signals.start_undefined_range.emit()
# Match the settings identity of installed mode
QCoreApplication.setOrganizationName(self.org_name)
QCoreApplication.setApplicationName(self.app_name)
# This is the reason why we need to save the app name in config, so that we later know
# where it's actually installed
if sys.platform.startswith("linux"):
self._uninstall_linux_user() # Linux uninstall
elif sys.platform == "win32":
self._uninstall_windows_user() # Windows uninstall
else:
raise RuntimeError(f"Unsupported platform: {sys.platform}")
except Exception:
error = traceback.format_exc()
self.logger.error(error)
# You can reuse install_finished if you want, but it's nicer to add uninstall_finished
if hasattr(self.signals, "uninstall_finished"):
self.signals.uninstall_finished.emit([False, error])
else:
self.signals.install_finished.emit([False, error])
self.signals.stop_undefined_range.emit()
return
self.signals.stop_undefined_range.emit()
if hasattr(self.signals, "uninstall_finished"):
self.signals.uninstall_finished.emit([True, ""])
else:
self.signals.install_finished.emit([True, ""])
# ----------------------------
# Linux (user-local uninstall)
# ----------------------------
def _uninstall_linux_user(self) -> None:
install_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppLocalDataLocation)
# The directory where stuff is currently installed to
apps_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.ApplicationsLocation)
if not apps_dir:
apps_dir = os.path.expanduser("~/.local/share/applications")
desktop_path = os.path.join(apps_dir, f"{self.app_id}.desktop")
# Remove desktop entry
safe_unlink(desktop_path)
# Remove payload files (best effort)
if install_dir and os.path.isdir(install_dir):
safe_rmtree(install_dir) # Delete the entire directory
# Clear settings keys / file
self._clear_qt_settings()
self.logger.info(f"Uninstalled (Linux). Removed: {desktop_path} and {install_dir}")
# ----------------------------
# Windows (user-local uninstall)
# ----------------------------
def _uninstall_windows_user(self) -> None:
install_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppLocalDataLocation)
# Start Menu Programs folder via Qt
start_menu_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.ApplicationsLocation)
if not start_menu_dir:
start_menu_dir = os.path.join(os.getenv("APPDATA", ""), "Microsoft", "Windows", "Start Menu", "Programs")
shortcut_path = os.path.join(start_menu_dir, f"{self.app_name}.lnk")
# Remove shortcut now (usually possible even while app runs)
safe_unlink(shortcut_path)
# Clear settings first
self._clear_qt_settings()
# Now remove install_dir.
# On Windows you typically cannot delete the running exe, so we spawn a helper bat
# that waits for this process to exit, then deletes the folder.
if install_dir and os.path.isdir(install_dir):
self._spawn_windows_cleanup_bat(
pid=os.getpid(),
install_dir=install_dir,
shortcut_path=shortcut_path,
)
self.logger.info(f"Uninstall scheduled (Windows). Shortcut removed: {shortcut_path}, install_dir: {install_dir}")
def _clear_qt_settings(self) -> None:
"""
Clear the app's Qt settings.
Works for INI-based settings and registry-based ones.
"""
try:
s = make_settings(portable=False)
s.clear()
s.sync()
# If this is an ini file, you can also delete it for a "clean uninstall"
fname = ""
try:
fname = s.fileName()
except Exception:
fname = "config.ini" # What could go wrong?
if fname and os.path.exists(fname):
safe_unlink(fname)
except Exception as e:
# Don't fail uninstall if settings cleanup fails
self.logger.warning(f"Settings cleanup failed: {e}")
def _spawn_windows_cleanup_bat(self, pid: int, install_dir: str, shortcut_path: str) -> None:
"""
Creates and runs a .bat that waits until PID exits, then deletes install_dir.
The .bat deletes itself at the end.
"""
bat_path = os.path.join(tempfile.gettempdir(), f"{self.app_id}_uninstall_cleanup.bat")
# Use rmdir /s /q for full folder wipe
# Hopefully this doesn't delete your PC LMAO
# "tasklist /FI" loop waits until PID is gone
script = rf"""@echo off
setlocal ENABLEDELAYEDEXPANSION
REM Wait for the app process to exit
:loop
tasklist /FI "PID eq {pid}" 2>NUL | find /I "{pid}" >NUL
if "%ERRORLEVEL%"=="0" (
timeout /t 1 /nobreak >NUL
goto loop
)
REM Remove shortcut (best effort)
del /f /q "{shortcut_path}" >NUL 2>&1
REM Remove install folder
rmdir /s /q "{install_dir}" >NUL 2>&1
REM Self-delete
del /f /q "%~f0" >NUL 2>&1
endlocal
"""
with open(bat_path, "w", encoding="utf-8") as f:
f.write(script)
# Launch hidden
try:
creationflags = subprocess.CREATE_NO_WINDOW
except Exception:
creationflags = 0
subprocess.Popen(
["cmd.exe", "/c", bat_path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=creationflags,
close_fds=True,
)
class AutoUpdateThread(QRunnable):
def __init__(self) -> None:
super(AutoUpdateThread, self).__init__()
self.signals: Signals = Signals()
self.assets: dict = {}
self.logger = setup_logger(name="Porn Fetch - [AutoUpdateThread]", log_file="PornFetch.log", level=logging.DEBUG)
def run(self):
self.signals.start_undefined_range.emit()
self.logger.info("Fetching release information...")
url = "https://echteralsfake.me/update"
response: httpx.Response = clients.core.fetch(url=url, get_response=True)
if response.status_code == 200:
self.assets = response.json()
self.logger.info(f"Got Update Information for: {self.assets["version"]}")
elif response.status_code == 502 or response.status_code == 530 or response.status_code == 500:
self.logger.error("Server is currently unable to return the update information. Please try again later...")
ui_popup("Server is currently unable to return the update information. Please try again later...")
return
self.logger.info("Starting auto-update process...")
os_arch = get_os_and_arch()
download_url = self.assets.get(f"download_{os_arch}")
if not download_url:
self.logger.error(f"No download URL found for {os_arch}")
self.signals.error_signal.emit(f"Update failed: No download available for your system ({os_arch}).")
return
self.logger.info(f"Downloading update from: {download_url}")
temp_dir = tempfile.gettempdir()
filename = download_url.split("/")[-1]
download_path = os.path.join(temp_dir, filename)
try:
clients.core.legacy_download(
url=download_url,
path=download_path,
callback=self.update_progress
)
self.logger.info("Download complete. Replacing binary.")
self.replace_binary(download_path)
self.logger.info("Update successful. Please restart the application.")
self.signals.error_signal.emit("Update successful! Please restart the application.")
except Exception as e:
self.logger.error(f"Update failed: {e}")
self.signals.error_signal.emit(f"Update failed: {e}")
def update_progress(self, current: int, total: int) -> None:
self.signals.total_progress.emit(current)
self.signals.total_progress_range.emit(total)
def replace_binary(self, new_binary_path: str) -> None:
current_binary_path = get_original_executable_path()
if not current_binary_path:
raise RuntimeError("Could not determine the path of the current executable.")
# On Windows, you can't replace a running executable.
# A common strategy is to use a helper script.
if sys.platform == "win32":
self.create_windows_updater(current_binary_path, new_binary_path)
else:
# On Linux/macOS, you can often replace the binary directly.
os.chmod(new_binary_path, 0o755)
shutil.move(new_binary_path, current_binary_path)
def create_windows_updater(self, current_path: str, new_path: str) -> None:
updater_script_path = os.path.join(tempfile.gettempdir(), "updater.bat")
with open(updater_script_path, "w") as f:
f.write(f"""
@echo off
echo Waiting for application to close...
taskkill /F /IM {os.path.basename(current_path)}
timeout /t 2 /nobreak
echo Replacing application file...
move /Y "{new_path}" "{current_path}"
echo Starting new version...
start "" "{current_path}"
del "%~f0"
""")
subprocess.Popen([updater_script_path], creationflags=subprocess.CREATE_NO_WINDOW)
QCoreApplication.quit()
class CheckUpdates(QRunnable):
def __init__(self):
super(CheckUpdates, self).__init__()
self.signals: Signals = Signals()
self.logger = shared_functions.setup_logger(name="Porn Fetch - [CheckUpdates]", log_file="PornFetch.log", level=logging.DEBUG,)
def run(self):
url = f"https://echteralsfake.me/update"
try:
response: httpx.Response = clients.core.fetch(url=url, get_response=True)
if response.status_code == 200:
json_stuff = response.json()
version = str(json_stuff["version"]).strip("latest - ")
if float(version) > float(__version__):
self.logger.info(f"A new update is available -->: {version}")
self.signals.update_check.emit(True, json_stuff)
else:
self.logger.info(f"Checked for updates... You are on the latest version :)")
self.signals.update_check.emit(False, json_stuff)
elif response.status_code == 404:
self.logger.error("Temporary error reaching the server")
return
elif response.status_code == 500:
self.logger.error("Internal Server error, probably already fixing it :) ")
return
elif response.status_code == 530 or response.status_code == 502:
self.logger.error("Server is currently offline. Proobably already fixing it :)")
except (ConnectionError, ConnectionResetError, ConnectionRefusedError, TimeoutError):
handle_error_gracefully(self, data=video_data.consistent_data, error_message="I could NOT check for updates. The server is either not reachable, or you don't have an IPv6 connection.")
class AddToTreeWidget(QRunnable):
def __init__(self, iterator: Iterable[clients.AnyVideoClass], is_checked: bool, last_index: int, custom_options: str):
super(AddToTreeWidget, self).__init__()
self.signals: Signals = Signals() # Processing signals for progress and information
self.iterator = iterator # The video iterator (Search or model object yk)
self.stop_flag = stop_flag # If the user pressed the stop process button
self.is_checked = is_checked # If the "do not clear videos" checkbox is checked
self.last_index = last_index # The last index (video) of the tree widget to maintain a correct order of numbers
self.custom_options = custom_options
self.consistent_data = video_data.consistent_data
self.output_path = self.consistent_data.get("output_path")
self.result_limit = self.consistent_data.get("result_limit")
self.supress_errors = self.consistent_data.get("supress_errors")
self.activate_logging = self.consistent_data.get("activate_logging")
self.logger = shared_functions.setup_logger(name="Porn Fetch - [AddToTreeWidget]", log_file="PornFetch.log", level=logging.DEBUG)
def process_video(self, video, index):
if isinstance(video, str):
report = video
else:
try:
report = video.url
except: # Don't ask
report = "If you can read this, then you know I fucked up badly. Please call ChatGPT and ask it to fix this code"
if not isinstance(video, str):
self.logger.debug(f"Requesting video processing of: {video.url}")
else:
self.logger.debug(f"Requesting video processing of: {video}")
for attempt in range(0, 5):
try:
video_identifier = random.randint(0, 99999999) # Creates a random ID for each video
if isinstance(video, str):
video = clients.check_video(url=video)
self.logger.info(f"[Download (3/10) - Video ID] -->: {video_identifier}")
data = clients.load_video_attributes(video, self.custom_options)
self.logger.debug("[Download (4/10) - Fetched Attributes")
session_urls.append(video.url)
title = clients.core.strip_title(data.title)
rendered_name = data.output_name
if self.consistent_data.get(
"directory_system"): # If the directory system is enabled, this will create an additional folder
author_path = os.path.join(self.output_path, data.author)
os.makedirs(author_path, exist_ok=True)
output_path = os.path.join(str(author_path), rendered_name + ".mp4")
else:
output_path = os.path.join(self.output_path, rendered_name + ".mp4")
# Emit the loaded signal with all the required information
data.title = title
data.output_name = output_path
data.index = index
data.video = video
video_data.data_objects.update({video_identifier: data})
self.logger.info(f"[Download (5/10) - Finished Processing]")
return video_identifier
except (ph_errors.PremiumVideo, IndexError):
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"Premium-only video skipped: {report}")
return False
except ph_errors.RegionBlocked:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"Region-blocked video skipped: {report}")
return False
except ph_errors.VideoDisabled:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"Warning: The video {report} is disabled. It will be skipped")
except ph_errors.RegexError:
message = f"""
A regex error occurred. This is always a 50/50 chance if it's my or PornHub's fault. If this happens again on
the same video, please consider reporting it. If you have logging enabled, this issue will automatically be reported.
Current Index: {index}
Additional Info: URL: {report}
"""
self.logger.error("Regex error occurred, sleeping one second...")
time.sleep(1)
if attempt == 4:
self.logger.info("Nevermind, didn't work lmao")
handle_error_gracefully(self, data=video_data.consistent_data, error_message=message, needs_network_log=True)
return False
except ph_errors.VideoPendingReview:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"Warning: The video {report} is pending review. It will be skipped")
return False
except InvalidResponse:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"Warning: The video: {report} returned an empty response when trying"
f"to fetch its content. There is nothing I can do. It will be skipped")
return False
except NotAvailable_HQ:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"The video: {report} is not available, because the CDN network has an issue. "
f"This is not my fault, please do NOT report this error, thank you :)")
return False
except WeirdError_HQ:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"An error happened with: {report} this is a weird error i have no fix for yet,"
f" however this will be reported, to help me fixing it :) ", needs_network_log=True)
return False
except VideoUnavailable_XV:
handle_error_gracefully(self, data=video_data.consistent_data, error_message= f"The video {report} is not available. Do not report this error... Not my fault :)")
return False
except NotAvailable_EP:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"The video: {report} is not available on HQPorner. This is not my fault, skipping...")
return False
except VideoDisabled_EP:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"The video: {report} has been disabled by EPorner itself. It will be skippled...")
return False
except VideoError_PH:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"The video: {report} has an error. However, in this case it's PornHub's fault. It will be skipped!")
return False
except RegionBlocked_YP:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"The video: {report} is region locked. It will be skipped...")
except VideoUnavailable_YP:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"The video: {report} is unavailable on YouPorn. It will be skipped...")
except ProxySSLError:
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"SSL Error, are you are you using a public network?")
except Exception:
error = traceback.format_exc()
handle_error_gracefully(self, data=video_data.consistent_data, error_message=f"Unexpected error occurred: {error}", needs_network_log=True)
return False
return None
def run(self):
self.signals.start_undefined_range.emit() # Starts the progressbar, but with a loading animation
is_first = True # see down below for an explanation
if isinstance(self.iterator, str):
self.iterator = [self.iterator]
if not self.is_checked:
self.signals.clear_tree_widget_signal.emit() # Clears the tree widget
if self.is_checked:
start = self.last_index
self.result_limit += start
else:
start = 1
self.logger.debug(f"Result Limit: {str(self.result_limit)}")
videos = islice(self.iterator, self.result_limit)
for i, video in enumerate(videos, start=start):
if self.stop_flag.is_set():
self.signals.tree_widget_finished.emit()
return # Stop processing if user pressed the stop button
if i >= self.result_limit + 1:
break # Respect search limit
video_id = self.process_video(video, i) # Passes video and index object / int
if video_id is False:
self.logger.warning(f"Skipping Video: {video}")
continue
if is_first:
self.signals.total_progress_range.emit(self.result_limit)
is_first = False # Otherwise the total would be sent every time which creates overhead
self.signals.total_progress.emit(i)
self.signals.text_data_to_tree_widget.emit(video_id)
self.logger.debug("Finished Iterating")
self.signals.tree_widget_finished.emit()
class DownloadScheduler(QObject):
worker_started = Signal(int, object) # video_id, worker
def __init__(self, threadpool, max_concurrent: int, parent=None):
super().__init__(parent)
self.pool = threadpool # Threadpool from the GUI (QThreadPool)
self.sem = QSemaphore(max_concurrent) # Creates a semaphore that limits the maximum number of concurrent downloads
self.queue = deque() # This is a queue, that keeps track of all vidos that are currently being downloaded
def enqueue(self, video_obj, video_id: int, quality, stop_event, segment_dir: str, segment_state_path: str, cleanup_on_stop: bool):
self.queue.append((video_obj, video_id, quality, stop_event, segment_dir, segment_state_path, cleanup_on_stop)) # Appends a video to the queue
# Video OBJ ->: This is the actual video object
# Video ID -->: Unique ID of the video generated by AddToTreWidget class, so that we can access its data
# Quality -->: The selected download quality from the quality box
# Stop Event -->: This is associated with the stop button. If pressed the event
# will raise an internal exception in the download process of eaf_base_api which gracefully stops the download
# and all network requests
self._try_start()
def _try_start(self):
while self.queue and self.sem.tryAcquire(1): # Tries to start the download
video_obj, video_id, quality, stop_event, segment_dir, segment_state_path, cleanup_on_stop = self.queue.popleft()
# Deletes the video from the queue, gets the data and then starts the download
worker = DownloadThread(video=video_obj, video_id=video_id, quality=quality, stop_event=stop_event,
segment_dir=segment_dir, segment_state_path=segment_state_path, cleanup_on_stop=cleanup_on_stop)
# Creates the actual download object
worker.signals.download_completed.connect(self._on_done)
# Connects the completed signal to the UI
self.worker_started.emit(video_id, worker) # Does something idk???
self.pool.start(worker) # Starts the actual download
logger.info(f"[Download (9/10) - Started Download]")
@Slot(int)
def _on_done(self, video_id: int):
self.sem.release(1)
self._try_start()
# Releases the semaphore, so that the next video can be downloaded
class ThumbnailFetcher(QRunnable):
def __init__(self, identifier: int):
super().__init__()
self.identifier = identifier
self.logger = setup_logger(name="Porn Fetch - [ThumbnailFetcher]", level=logging.DEBUG, log_file="PornFetch.log")
def run(self):
data: VideoAttributes = video_data.data_objects.get(self.identifier)
title = data.title
video = data.video
thumbnail = data.thumbnail
if thumbnail:
self.logger.info(f"Trying to fetch thumbnail for Video: {title}")
temp_core = clients.core
if "hqporner" in video.url:
temp_core.session.headers["Referer"] = "https://hqporner.com/"
elif "pornhub" in video.url:
temp_core.session.headers["Referer"] = "https://pornhub.com/"
elif "youporn" in video.url:
temp_core.session.headers["Referer"] = "https://youporn.com/"
if not thumbnail.startswith("http"):
thumbnail = f"https://{thumbnail}"
try:
thumbnail_data = temp_core.fetch(thumbnail, get_bytes=True)
self.logger.info(f"Successfully fetched thumbnail for: {title}")
video_data.data_objects[self.identifier].thumbnail_data = thumbnail_data
except Exception:
error = traceback.format_exc()
self.logger.error(f"Couldn't fetch Thumbnail: {thumbnail} for -->: {title} ERROR: {error}")
class DownloadThread(QRunnable):
"""Refactored threading class to download videos with improved performance and logic."""
def __init__(self, video, video_id, quality, stop_event, segment_state_path, segment_dir, cleanup_on_stop: bool):
super().__init__()
self.video: clients.AllowedVideoType = video
self.video_id = video_id
self.consistent_data = video_data.consistent_data
self.quality = quality