-
-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathutils.py
More file actions
334 lines (264 loc) · 10.5 KB
/
utils.py
File metadata and controls
334 lines (264 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import asyncio
import os
import shutil
import subprocess # noqa: S404
import sys
from collections.abc import Callable, Iterable, Sequence
from enum import IntEnum
from functools import partial
from itertools import chain
from platform import version
from threading import Thread
from typing import TYPE_CHECKING, Any, TypeAlias, TypedDict, TypeGuard, TypeVar
import cv2
import numpy as np
from cv2.typing import MatLike
from gen.build_vars import AUTOSPLIT_BUILD_NUMBER, AUTOSPLIT_GITHUB_REPOSITORY
if sys.platform == "win32":
import ctypes
import ctypes.wintypes
from _ctypes import COMError # noqa: PLC2701 # comtypes is untyped
import win32gui
import win32ui
from pygrabber.dshow_graph import FilterGraph
STARTUPINFO: TypeAlias = subprocess.STARTUPINFO
else:
STARTUPINFO: TypeAlias = None
if sys.platform == "linux":
import fcntl
from pyscreeze import RUNNING_WAYLAND as RUNNING_WAYLAND # noqa: PLC0414
else:
RUNNING_WAYLAND = False
if TYPE_CHECKING:
# Source does not exist, keep this under TYPE_CHECKING
from _win32typing import PyCDC # pyright: ignore[reportMissingModuleSource]
T = TypeVar("T")
def find_tesseract_path():
search_path = os.environ.get("PATH", os.defpath)
if sys.platform == "win32":
search_path += r";C:\Program Files\Tesseract-OCR;C:\Program Files (x86)\Tesseract-OCR"
return shutil.which(TESSERACT_EXE, path=search_path)
TESSERACT_EXE = "tesseract"
TESSERACT_PATH = find_tesseract_path()
"""The path to execute tesseract. `None` if it can't be found."""
TESSERACT_CMD = (TESSERACT_PATH or TESSERACT_EXE, "-", "-", "--oem", "1", "--psm", "6")
DWMWA_EXTENDED_FRAME_BOUNDS = 9
MAXBYTE = 255
ONE_SECOND = 1000
"""1000 milliseconds in 1 second"""
BGR_CHANNEL_COUNT = 3
"""How many channels in a BGR image"""
BGRA_CHANNEL_COUNT = 4
"""How many channels in a BGRA image"""
class ImageShape(IntEnum):
Y = 0
X = 1
Channels = 2
class ColorChannel(IntEnum):
Blue = 0
Green = 1
Red = 2
Alpha = 3
class SubprocessKWArgs(TypedDict):
stdin: int
stdout: int
stderr: int
startupinfo: "STARTUPINFO | None"
env: os._Environ[str] | None # pyright: ignore[reportPrivateUsage]
def decimal(value: float):
# Using ljust instead of :2f because of python float rounding errors
return f"{int(value * 100) / 100}".ljust(4, "0")
def is_digit(value: str | int | None):
"""Checks if `value` is a single-digit string from 0-9."""
if value is None:
return False
try:
return 0 <= int(value) <= 9 # noqa: PLR2004
except (ValueError, TypeError):
return False
def is_valid_image(image: MatLike | None) -> TypeGuard[MatLike]:
return image is not None and bool(image.size)
def is_valid_hwnd(hwnd: int):
"""
Validate the hwnd points to a valid window
and not the desktop or whatever window obtained with `""`.
"""
if not hwnd:
return False
if sys.platform == "win32":
return bool(win32gui.IsWindow(hwnd) and win32gui.GetWindowText(hwnd))
return True
def first(iterable: Iterable[T]) -> T:
"""@return: The first element of a collection. Dictionaries will return the first key."""
return next(iter(iterable))
def try_delete_dc(dc: "PyCDC"):
if sys.platform != "win32":
raise OSError
try:
dc.DeleteDC()
except win32ui.error:
pass
def get_window_bounds(hwnd: int) -> tuple[int, int, int, int]:
if sys.platform != "win32":
raise OSError
extended_frame_bounds = ctypes.wintypes.RECT()
ctypes.windll.dwmapi.DwmGetWindowAttribute(
hwnd,
DWMWA_EXTENDED_FRAME_BOUNDS,
ctypes.byref(extended_frame_bounds),
ctypes.sizeof(extended_frame_bounds),
)
window_rect = win32gui.GetWindowRect(hwnd)
window_left_bounds = extended_frame_bounds.left - window_rect[0]
window_top_bounds = extended_frame_bounds.top - window_rect[1]
window_width = extended_frame_bounds.right - extended_frame_bounds.left
window_height = extended_frame_bounds.bottom - extended_frame_bounds.top
return window_left_bounds, window_top_bounds, window_width, window_height
# Note: maybe reorganize capture_method module to have
# different helper modules and a methods submodule
def get_input_device_resolution(index: int) -> tuple[int, int] | None:
if sys.platform != "win32":
return (0, 0)
filter_graph = FilterGraph()
try:
filter_graph.add_video_input_device(index)
# This can happen with virtual cameras throwing errors.
# For example since OBS 29.1 updated FFMPEG breaking VirtualCam 3.0
# https://github.com/Toufool/AutoSplit/issues/238
except COMError:
return None
try:
resolution = filter_graph.get_input_device().get_current_format()
# For unknown reasons, some devices can raise "ValueError: NULL pointer access".
# For instance, Oh_DeeR's AVerMedia HD Capture C985 Bus 12
except ValueError:
return None
finally:
filter_graph.remove_filters()
return resolution
def open_file(file_path: str | bytes | os.PathLike[str] | os.PathLike[bytes]):
if sys.platform == "win32":
os.startfile(file_path) # noqa: S606
else:
opener = "xdg-open" if sys.platform == "linux" else "open"
subprocess.call([opener, file_path]) # noqa: S603
def get_or_create_eventloop():
try:
return asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()
def try_input_device_access():
"""Same as `make_uinput` in `keyboard/_nixcommon.py`."""
if sys.platform != "linux":
return False
try:
UI_SET_EVBIT = 0x40045564 # noqa: N806
with open("/dev/uinput", "wb") as uinput:
fcntl.ioctl(uinput, UI_SET_EVBIT)
except OSError:
return False
return True
def fire_and_forget(func: Callable[..., Any]):
"""
Runs synchronous function asynchronously without waiting for a response.
Uses threads on Windows because
~~`RuntimeError: There is no current event loop in thread 'MainThread'`~~
maybe asyncio has issues. Unsure. See alpha.5 and https://github.com/Avasam/AutoSplit/issues/36
Uses asyncio on Linux because of a `Segmentation fault (core dumped)`
"""
def wrapped(*args: Any, **kwargs: Any):
if sys.platform == "win32":
thread = Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return thread
return get_or_create_eventloop().run_in_executor(None, partial(func, *args, **kwargs))
return wrapped
def flatten(nested_iterable: Iterable[Iterable[T]]) -> chain[T]:
return chain.from_iterable(nested_iterable)
def imread(filename: str, flags: int = cv2.IMREAD_COLOR):
return cv2.imdecode(np.fromfile(filename, dtype=np.uint8), flags)
def imwrite(filename: str, img: MatLike, params: Sequence[int] = ()):
success, encoded_img = cv2.imencode(os.path.splitext(filename)[1], img, params)
if not success:
raise OSError(f"cv2 could not write to path {filename}")
encoded_img.tofile(filename)
def subprocess_kwargs():
"""
Create a set of arguments which make a ``subprocess.Popen`` (and
variants) call work with or without Pyinstaller, ``--noconsole`` or
not, on Windows and Linux.
Typical use:
```python
subprocess.call(["program_to_run", "arg_1"], **subprocess_args())
```
---
Originally found in https://github.com/madmaze/pytesseract/blob/master/pytesseract/pytesseract.py
Recipe from https://github.com/pyinstaller/pyinstaller/wiki/Recipe-subprocess
which itself is taken from https://github.com/bjones1/enki/blob/master/enki/lib/get_console_output.py
"""
# The following is true only on Windows.
if sys.platform == "win32":
# On Windows, subprocess calls will pop up a command window by default when run from
# Pyinstaller with the ``--noconsole`` option. Avoid this distraction.
startupinfo = STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
# https://github.com/madmaze/pytesseract/blob/88839f03590578a10e806a5244704437c9d477da/pytesseract/pytesseract.py#L236
startupinfo.wShowWindow = subprocess.SW_HIDE
# Windows doesn't search the path by default. Pass it an environment so it will.
env = os.environ
else:
startupinfo = None
env = None
# On Windows, running this from the binary produced by Pyinstaller
# with the ``--noconsole`` option requires redirecting everything
# (stdin, stdout, stderr) to avoid an OSError exception
# "[Error 6] the handle is invalid."
return SubprocessKWArgs(
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
startupinfo=startupinfo,
env=env,
)
def run_tesseract(png: bytes):
"""
Executes the tesseract CLI and pipes a PNG encoded image to it.
@param png: PNG encoded image as byte array
@return: The recognized output string from tesseract.
"""
return (
subprocess.Popen( # noqa: S603 # Only using known literal strings
TESSERACT_CMD, **subprocess_kwargs()
)
.communicate(input=png)[0]
.decode()
)
def list_processes():
if sys.platform == "win32":
return [
# The first row is the process name
line.split()[0]
for line in subprocess.check_output( # Known input
"C:/Windows/System32/tasklist.exe", text=True
).splitlines()[3:] # Skip the table header lines
if line
]
return subprocess.check_output( # noqa: S603 # Known input
("ps", "-eo", "comm"), text=True
).splitlines()[1:] # Skip the header line
# Environment specifics
WINDOWS_BUILD_NUMBER = int(version().split(".")[-1]) if sys.platform == "win32" else -1
FIRST_WIN_11_BUILD = 22000
WGC_MIN_BUILD = 17134
"""https://docs.microsoft.com/en-us/uwp/api/windows.graphics.capture.graphicscapturepicker#applies-to"""
FROZEN = hasattr(sys, "frozen")
"""Running from build made by PyInstaller"""
auto_split_directory = os.path.dirname(sys.executable if FROZEN else os.path.abspath(__file__))
"""The directory of either the AutoSplit executable or AutoSplit.py"""
# Shared strings
# Check `excludeBuildNumber` during workflow dispatch build generate a clean version number
AUTOSPLIT_VERSION = "2.3.2" + (f"-{AUTOSPLIT_BUILD_NUMBER}" if AUTOSPLIT_BUILD_NUMBER else "")
"""AutoSplit Version number"""
GITHUB_REPOSITORY = AUTOSPLIT_GITHUB_REPOSITORY