From 34430b3509f6eaf6e15187ce2a124632c9d14ec3 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 01:30:16 +0900 Subject: [PATCH 01/18] [proof of concept] async model file listing --- folder_paths.py | 90 ++++++++++++++++++++++++++++++++++-------------- requirements.txt | 1 + 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index 38fad6238a61..e05c12595076 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio, aiofiles, threading import os import time import mimetypes @@ -7,6 +8,8 @@ from typing import Set, List, Dict, Tuple, Literal from collections.abc import Collection +import aiofiles.os + supported_pt_extensions: set[str] = {'.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl', '.sft'} folder_names_and_paths: dict[str, tuple[list[str], set[str]]] = {} @@ -194,23 +197,36 @@ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) logging.warning(f"Warning: Unable to access {directory}. Skipping this path.") logging.debug("recursive file list on directory {}".format(directory)) - dirpath: str - subdirs: list[str] - filenames: list[str] - - for dirpath, subdirs, filenames in os.walk(directory, followlinks=True, topdown=True): - subdirs[:] = [d for d in subdirs if d not in excluded_dir_names] - for file_name in filenames: - relative_path = os.path.relpath(os.path.join(dirpath, file_name), directory) - result.append(relative_path) - - for d in subdirs: - path: str = os.path.join(dirpath, d) - try: - dirs[path] = os.path.getmtime(path) - except FileNotFoundError: - logging.warning(f"Warning: Unable to access {path}. Skipping this path.") - continue + + async def proc_subdir(path: str): + try: + dirs[path] = await aiofiles.os.path.getmtime(path) + except FileNotFoundError: + logging.warning(f"Warning: Unable to access {path}. Skipping this path.") + + def proc_thread(): + asyncio.set_event_loop(asyncio.new_event_loop()) + calls = [] + + async def handle(file): + if not await aiofiles.os.path.isdir(file): + relative_path = os.path.relpath(file, directory) + result.append(relative_path) + return + for subdir in aiofiles.os.listdir(file): + path = os.path.join(file, subdir) + if subdir not in excluded_dir_names: + calls.append(handle(path)) + calls.append(proc_subdir(path)) + + future = asyncio.gather(*calls) + asyncio.get_event_loop().run_until_complete(future) + asyncio.get_event_loop().close() + + thread = threading.Thread(target=proc_thread) + thread.start() + thread.join() + logging.debug("found {} files".format(len(result))) return result, dirs @@ -263,19 +279,41 @@ def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float] if folder_name not in filename_list_cache: return None out = filename_list_cache[folder_name] + must_invalidate = [False] + folders = folder_names_and_paths[folder_name] - for x in out[1]: - time_modified = out[1][x] - folder = x - if os.path.getmtime(folder) != time_modified: - return None + async def check_folder_mtime(folder: str, time_modified: float): + if await aiofiles.os.path.getmtime(folder) != time_modified: + must_invalidate[0] = True - folders = folder_names_and_paths[folder_name] - for x in folders[0]: - if os.path.isdir(x): + async def check_new_dirs(x: str): + if await aiofiles.os.path.isdir(x): if x not in out[1]: - return None + must_invalidate[0] = True + + def proc_thread(): + asyncio.set_event_loop(asyncio.new_event_loop()) + calls = [] + + for x in out[1]: + time_modified = out[1][x] + call = check_folder_mtime(x, time_modified) + calls.append(call) + + for x in folders[0]: + call = check_new_dirs(x) + calls.append(call) + future = asyncio.gather(*calls) + asyncio.get_event_loop().run_until_complete(future) + asyncio.get_event_loop().close() + + thread = threading.Thread(target=proc_thread) + thread.start() + thread.join() + + if must_invalidate[0]: + return None return out def get_filename_list(folder_name: str) -> list[str]: diff --git a/requirements.txt b/requirements.txt index 4c2c0b2b2215..da597b0d4aeb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ transformers>=4.28.1 tokenizers>=0.13.3 sentencepiece safetensors>=0.4.2 +aiofiles aiohttp pyyaml Pillow From d6ba7ed4db54e5755ffe86729925aa948d63e503 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 02:05:49 +0900 Subject: [PATCH 02/18] oops, fix sloppy refactor "refactor right before committing" the whiskey said, "it'll be fine" it said. woopsie. --- folder_paths.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index e05c12595076..6abc573d397f 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -213,14 +213,17 @@ async def handle(file): relative_path = os.path.relpath(file, directory) result.append(relative_path) return - for subdir in aiofiles.os.listdir(file): + for subdir in await aiofiles.os.listdir(file): path = os.path.join(file, subdir) if subdir not in excluded_dir_names: calls.append(handle(path)) calls.append(proc_subdir(path)) + calls.append(handle(directory)) - future = asyncio.gather(*calls) - asyncio.get_event_loop().run_until_complete(future) + while len(calls) > 0: + future = asyncio.gather(*calls) + calls = [] + asyncio.get_event_loop().run_until_complete(future) asyncio.get_event_loop().close() thread = threading.Thread(target=proc_thread) From e98b52bf533ade48885c72338f7feef714e6d669 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 02:31:48 +0900 Subject: [PATCH 03/18] other fix --- folder_paths.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/folder_paths.py b/folder_paths.py index 6abc573d397f..05b0549413cb 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -213,11 +213,11 @@ async def handle(file): relative_path = os.path.relpath(file, directory) result.append(relative_path) return + calls.append(proc_subdir(file)) for subdir in await aiofiles.os.listdir(file): path = os.path.join(file, subdir) if subdir not in excluded_dir_names: calls.append(handle(path)) - calls.append(proc_subdir(path)) calls.append(handle(directory)) while len(calls) > 0: From 2deb55df0655417948dce14c04f84e43f590b39b Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 02:32:07 +0900 Subject: [PATCH 04/18] one more cleanup --- folder_paths.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index 05b0549413cb..4c4d0947766a 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -199,10 +199,7 @@ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) logging.debug("recursive file list on directory {}".format(directory)) async def proc_subdir(path: str): - try: - dirs[path] = await aiofiles.os.path.getmtime(path) - except FileNotFoundError: - logging.warning(f"Warning: Unable to access {path}. Skipping this path.") + dirs[path] = await aiofiles.os.path.getmtime(path) def proc_thread(): asyncio.set_event_loop(asyncio.new_event_loop()) From b35ad9aeffe2c105546ea4a443664b3e96a0c7b5 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 16:57:47 +0900 Subject: [PATCH 05/18] discard aiofiles --- folder_paths.py | 49 +++++++++++++++++++++++++++++++++++++++--------- requirements.txt | 1 - 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index 4c4d0947766a..b4fccdffb610 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -1,6 +1,7 @@ from __future__ import annotations -import asyncio, aiofiles, threading +import asyncio, threading +from functools import partial, wraps import os import time import mimetypes @@ -8,8 +9,6 @@ from typing import Set, List, Dict, Tuple, Literal from collections.abc import Collection -import aiofiles.os - supported_pt_extensions: set[str] = {'.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl', '.sft'} folder_names_and_paths: dict[str, tuple[list[str], set[str]]] = {} @@ -199,19 +198,19 @@ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) logging.debug("recursive file list on directory {}".format(directory)) async def proc_subdir(path: str): - dirs[path] = await aiofiles.os.path.getmtime(path) + dirs[path] = await AsyncFiles.getmtime(path) def proc_thread(): asyncio.set_event_loop(asyncio.new_event_loop()) calls = [] async def handle(file): - if not await aiofiles.os.path.isdir(file): - relative_path = os.path.relpath(file, directory) + if not await AsyncFiles.isdir(file): + relative_path = await AsyncFiles.relpath(file, directory) result.append(relative_path) return calls.append(proc_subdir(file)) - for subdir in await aiofiles.os.listdir(file): + for subdir in await AsyncFiles.listdir(file): path = os.path.join(file, subdir) if subdir not in excluded_dir_names: calls.append(handle(path)) @@ -283,11 +282,11 @@ def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float] folders = folder_names_and_paths[folder_name] async def check_folder_mtime(folder: str, time_modified: float): - if await aiofiles.os.path.getmtime(folder) != time_modified: + if await AsyncFiles.getmtime(folder) != time_modified: must_invalidate[0] = True async def check_new_dirs(x: str): - if await aiofiles.os.path.isdir(x): + if await AsyncFiles.isdir(x): if x not in out[1]: must_invalidate[0] = True @@ -371,3 +370,35 @@ def compute_vars(input: str, image_width: int, image_height: int) -> str: os.makedirs(full_output_folder, exist_ok=True) counter = 1 return full_output_folder, filename, counter, subfolder, filename_prefix + + +def aio_wrap(func): + @wraps(func) + async def run(*args, loop=None, executor=None, **kwargs): + if loop is None: + loop = asyncio.get_running_loop() + pfunc = partial(func, *args, **kwargs) + return await loop.run_in_executor(executor, pfunc) + return run + + +class AsyncFiles: + @staticmethod + @aio_wrap + def listdir(path: str) -> list[str]: + return os.listdir(path) + + @staticmethod + @aio_wrap + def isdir(path: str) -> bool: + return os.path.isdir(path) + + @staticmethod + @aio_wrap + def getmtime(path: str) -> float: + return os.path.getmtime(path) + + @staticmethod + @aio_wrap + def relpath(file: str, directory: str) -> str: + return os.path.relpath(file, directory) diff --git a/requirements.txt b/requirements.txt index da597b0d4aeb..4c2c0b2b2215 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,6 @@ transformers>=4.28.1 tokenizers>=0.13.3 sentencepiece safetensors>=0.4.2 -aiofiles aiohttp pyyaml Pillow From b8022cf02bc3ebe4162de6e84217f74468e54c96 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 17:01:22 +0900 Subject: [PATCH 06/18] use a threading.Event to make intent more explicit --- folder_paths.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index b4fccdffb610..14592234ad3d 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -278,17 +278,17 @@ def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float] if folder_name not in filename_list_cache: return None out = filename_list_cache[folder_name] - must_invalidate = [False] + must_invalidate = threading.Event() folders = folder_names_and_paths[folder_name] async def check_folder_mtime(folder: str, time_modified: float): if await AsyncFiles.getmtime(folder) != time_modified: - must_invalidate[0] = True + must_invalidate.set() async def check_new_dirs(x: str): if await AsyncFiles.isdir(x): if x not in out[1]: - must_invalidate[0] = True + must_invalidate.set() def proc_thread(): asyncio.set_event_loop(asyncio.new_event_loop()) @@ -311,7 +311,7 @@ def proc_thread(): thread.start() thread.join() - if must_invalidate[0]: + if must_invalidate.is_set(): return None return out From 5d8a0b7afd65c3b83e037669de1901b72d711d44 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 17:12:50 +0900 Subject: [PATCH 07/18] simplify async code with ThreadPoolExecutor --- folder_paths.py | 94 ++++++++++++------------------------------------- 1 file changed, 22 insertions(+), 72 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index 14592234ad3d..4ba9856dfc4f 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -8,6 +8,7 @@ import logging from typing import Set, List, Dict, Tuple, Literal from collections.abc import Collection +from concurrent.futures import ThreadPoolExecutor supported_pt_extensions: set[str] = {'.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl', '.sft'} @@ -197,34 +198,24 @@ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) logging.debug("recursive file list on directory {}".format(directory)) - async def proc_subdir(path: str): - dirs[path] = await AsyncFiles.getmtime(path) + with ThreadPoolExecutor() as executor: - def proc_thread(): - asyncio.set_event_loop(asyncio.new_event_loop()) - calls = [] + def proc_subdir(path: str): + dirs[path] = os.path.getmtime(path) - async def handle(file): - if not await AsyncFiles.isdir(file): - relative_path = await AsyncFiles.relpath(file, directory) + def handle(file): + if not os.path.isdir(file): + relative_path = os.path.relpath(file, directory) result.append(relative_path) return - calls.append(proc_subdir(file)) - for subdir in await AsyncFiles.listdir(file): + executor.submit(lambda: proc_subdir(file)) + for subdir in os.listdir(file): path = os.path.join(file, subdir) if subdir not in excluded_dir_names: - calls.append(handle(path)) - calls.append(handle(directory)) + executor.submit(lambda: handle(path)) - while len(calls) > 0: - future = asyncio.gather(*calls) - calls = [] - asyncio.get_event_loop().run_until_complete(future) - asyncio.get_event_loop().close() - - thread = threading.Thread(target=proc_thread) - thread.start() - thread.join() + executor.submit(lambda: handle(directory)) + executor.shutdown(wait=True) logging.debug("found {} files".format(len(result))) return result, dirs @@ -281,35 +272,26 @@ def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float] must_invalidate = threading.Event() folders = folder_names_and_paths[folder_name] - async def check_folder_mtime(folder: str, time_modified: float): - if await AsyncFiles.getmtime(folder) != time_modified: - must_invalidate.set() + with ThreadPoolExecutor() as executor: - async def check_new_dirs(x: str): - if await AsyncFiles.isdir(x): - if x not in out[1]: + def check_folder_mtime(folder: str, time_modified: float): + if os.path.getmtime(folder) != time_modified: must_invalidate.set() - def proc_thread(): - asyncio.set_event_loop(asyncio.new_event_loop()) - calls = [] + def check_new_dirs(x: str): + if os.path.isdir(x): + if x not in out[1]: + must_invalidate.set() for x in out[1]: time_modified = out[1][x] - call = check_folder_mtime(x, time_modified) - calls.append(call) + executor.submit(lambda: check_folder_mtime(x, time_modified)) for x in folders[0]: - call = check_new_dirs(x) - calls.append(call) + executor.submit(lambda: check_new_dirs(x)) - future = asyncio.gather(*calls) - asyncio.get_event_loop().run_until_complete(future) - asyncio.get_event_loop().close() + executor.shutdown(wait=True) - thread = threading.Thread(target=proc_thread) - thread.start() - thread.join() if must_invalidate.is_set(): return None @@ -370,35 +352,3 @@ def compute_vars(input: str, image_width: int, image_height: int) -> str: os.makedirs(full_output_folder, exist_ok=True) counter = 1 return full_output_folder, filename, counter, subfolder, filename_prefix - - -def aio_wrap(func): - @wraps(func) - async def run(*args, loop=None, executor=None, **kwargs): - if loop is None: - loop = asyncio.get_running_loop() - pfunc = partial(func, *args, **kwargs) - return await loop.run_in_executor(executor, pfunc) - return run - - -class AsyncFiles: - @staticmethod - @aio_wrap - def listdir(path: str) -> list[str]: - return os.listdir(path) - - @staticmethod - @aio_wrap - def isdir(path: str) -> bool: - return os.path.isdir(path) - - @staticmethod - @aio_wrap - def getmtime(path: str) -> float: - return os.path.getmtime(path) - - @staticmethod - @aio_wrap - def relpath(file: str, directory: str) -> str: - return os.path.relpath(file, directory) From 163bfff829a962d2bfddd1a88743060cc65b2e91 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 17:24:51 +0900 Subject: [PATCH 08/18] `.shutdown` isn't so friendly :( --- folder_paths.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index 4ba9856dfc4f..5386dcbfb86f 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -1,7 +1,6 @@ from __future__ import annotations -import asyncio, threading -from functools import partial, wraps +import threading import os import time import mimetypes @@ -199,23 +198,31 @@ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) logging.debug("recursive file list on directory {}".format(directory)) with ThreadPoolExecutor() as executor: + calls = [] def proc_subdir(path: str): dirs[path] = os.path.getmtime(path) def handle(file): - if not os.path.isdir(file): - relative_path = os.path.relpath(file, directory) - result.append(relative_path) - return - executor.submit(lambda: proc_subdir(file)) - for subdir in os.listdir(file): - path = os.path.join(file, subdir) - if subdir not in excluded_dir_names: - executor.submit(lambda: handle(path)) - - executor.submit(lambda: handle(directory)) - executor.shutdown(wait=True) + try: + if not os.path.isdir(file): + relative_path = os.path.relpath(file, directory) + result.append(relative_path) + return + + calls.append(executor.submit(lambda: proc_subdir(file))) + + for subdir in os.listdir(file): + path = os.path.join(file, subdir) + if subdir not in excluded_dir_names: + calls.append(executor.submit(lambda: handle(path))) + except Exception as e: + logging.error(f"Error while handling {file}: {e}") + + calls.append(executor.submit(lambda: handle(directory))) + while len(calls) > 0: + calls.pop().result() + logging.debug("found {} files".format(len(result))) return result, dirs From d659be7ef04798cde316cf3f191a4f0ad045d798 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 17:49:29 +0900 Subject: [PATCH 09/18] minor format --- folder_paths.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index f1406b197fb4..af498b399aea 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -303,7 +303,7 @@ def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float] strong_cache = cache_helper.get(folder_name) if strong_cache is not None: return strong_cache - + global filename_list_cache global folder_names_and_paths folder_name = map_legacy(folder_name) @@ -333,7 +333,6 @@ def check_new_dirs(x: str): executor.shutdown(wait=True) - if must_invalidate.is_set(): return None return out From 49f973bc9304ed2e4eb019338324ef29c24feb86 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 17:56:05 +0900 Subject: [PATCH 10/18] minor improvement --- folder_paths.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/folder_paths.py b/folder_paths.py index af498b399aea..15960537fed2 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -224,6 +224,7 @@ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) dirs[directory] = os.path.getmtime(directory) except FileNotFoundError: logging.warning(f"Warning: Unable to access {directory}. Skipping this path.") + return [], {} logging.debug("recursive file list on directory {}".format(directory)) @@ -253,7 +254,6 @@ def handle(file): while len(calls) > 0: calls.pop().result() - logging.debug("found {} files".format(len(result))) return result, dirs From c3d47aa429843f3feea07123d292e0af5fe8c34b Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 18:01:35 +0900 Subject: [PATCH 11/18] clearer error message --- folder_paths.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/folder_paths.py b/folder_paths.py index 15960537fed2..d530e31617e2 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -248,7 +248,7 @@ def handle(file): if subdir not in excluded_dir_names: calls.append(executor.submit(lambda: handle(path))) except Exception as e: - logging.error(f"Error while handling {file}: {e}") + logging.error(f"recursive_search encountered error while handling '{file}': {e}") calls.append(executor.submit(lambda: handle(directory))) while len(calls) > 0: From 5a00feae3cc6cbe716b08e801afca74ad54d98e8 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 19 Sep 2024 21:02:21 +0900 Subject: [PATCH 12/18] handle python lambda behaviors --- folder_paths.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index d530e31617e2..2312cbc7ce51 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -241,12 +241,12 @@ def handle(file): result.append(relative_path) return - calls.append(executor.submit(lambda: proc_subdir(file))) + calls.append(executor.submit(lambda f=file: proc_subdir(f))) for subdir in os.listdir(file): - path = os.path.join(file, subdir) if subdir not in excluded_dir_names: - calls.append(executor.submit(lambda: handle(path))) + path = os.path.join(file, subdir) + calls.append(executor.submit(lambda p=path: handle(p))) except Exception as e: logging.error(f"recursive_search encountered error while handling '{file}': {e}") @@ -326,10 +326,10 @@ def check_new_dirs(x: str): for x in out[1]: time_modified = out[1][x] - executor.submit(lambda: check_folder_mtime(x, time_modified)) + executor.submit(lambda f=x, t=time_modified: check_folder_mtime(f, t)) for x in folders[0]: - executor.submit(lambda: check_new_dirs(x)) + executor.submit(lambda f=x: check_new_dirs(f)) executor.shutdown(wait=True) From 00db33f6b7fcb1a1321286d88c95cf138cabe9fe Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 3 Oct 2024 12:49:52 -0700 Subject: [PATCH 13/18] use one persistent thread pool executor instead of generating new ones --- folder_paths.py | 68 +++++++++++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index be416b7433f9..c99d3866dfbb 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -48,6 +48,8 @@ filename_list_cache: dict[str, tuple[list[str], dict[str, float], float]] = {} +async_executor = ThreadPoolExecutor() + class CacheHelper: """ Helper class for managing file list cache data. @@ -231,31 +233,30 @@ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) logging.debug("recursive file list on directory {}".format(directory)) - with ThreadPoolExecutor() as executor: - calls = [] + calls = [] - def proc_subdir(path: str): - dirs[path] = os.path.getmtime(path) + def proc_subdir(path: str): + dirs[path] = os.path.getmtime(path) - def handle(file): - try: - if not os.path.isdir(file): - relative_path = os.path.relpath(file, directory) - result.append(relative_path) - return + def handle(file): + try: + if not os.path.isdir(file): + relative_path = os.path.relpath(file, directory) + result.append(relative_path) + return - calls.append(executor.submit(lambda f=file: proc_subdir(f))) + calls.append(async_executor.submit(lambda f=file: proc_subdir(f))) - for subdir in os.listdir(file): - if subdir not in excluded_dir_names: - path = os.path.join(file, subdir) - calls.append(executor.submit(lambda p=path: handle(p))) - except Exception as e: - logging.error(f"recursive_search encountered error while handling '{file}': {e}") + for subdir in os.listdir(file): + if subdir not in excluded_dir_names: + path = os.path.join(file, subdir) + calls.append(async_executor.submit(lambda p=path: handle(p))) + except Exception as e: + logging.error(f"recursive_search encountered error while handling '{file}': {e}") - calls.append(executor.submit(lambda: handle(directory))) - while len(calls) > 0: - calls.pop().result() + calls.append(async_executor.submit(lambda: handle(directory))) + while len(calls) > 0: + calls.pop().result() logging.debug("found {} files".format(len(result))) return result, dirs @@ -316,25 +317,26 @@ def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float] must_invalidate = threading.Event() folders = folder_names_and_paths[folder_name] - with ThreadPoolExecutor() as executor: + def check_folder_mtime(folder: str, time_modified: float): + if os.path.getmtime(folder) != time_modified: + must_invalidate.set() - def check_folder_mtime(folder: str, time_modified: float): - if os.path.getmtime(folder) != time_modified: + def check_new_dirs(x: str): + if os.path.isdir(x): + if x not in out[1]: must_invalidate.set() - def check_new_dirs(x: str): - if os.path.isdir(x): - if x not in out[1]: - must_invalidate.set() + calls = [] - for x in out[1]: - time_modified = out[1][x] - executor.submit(lambda f=x, t=time_modified: check_folder_mtime(f, t)) + for x in out[1]: + time_modified = out[1][x] + calls.append(async_executor.submit(lambda f=x, t=time_modified: check_folder_mtime(f, t))) - for x in folders[0]: - executor.submit(lambda f=x: check_new_dirs(f)) + for x in folders[0]: + calls.append(async_executor.submit(lambda f=x: check_new_dirs(f))) - executor.shutdown(wait=True) + while len(calls) > 0: + calls.pop().result() if must_invalidate.is_set(): return None From 587d0c95f281ac6d6de58fe22387dacc31e9726c Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 3 Oct 2024 13:01:12 -0700 Subject: [PATCH 14/18] add a max_workers value the default is 32 limited by cpu core count, but we don't really care about core count here because this is for io, so just use 32 --- folder_paths.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/folder_paths.py b/folder_paths.py index c99d3866dfbb..c4ff5a91e94c 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -48,7 +48,7 @@ filename_list_cache: dict[str, tuple[list[str], dict[str, float], float]] = {} -async_executor = ThreadPoolExecutor() +async_executor = ThreadPoolExecutor(32) class CacheHelper: """ From ba90c6065c07b6c4dd76f955e341fb1995a94ddb Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 3 Oct 2024 13:11:22 -0700 Subject: [PATCH 15/18] pre-scan model lists --- folder_paths.py | 15 ++++++++++++++- main.py | 2 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/folder_paths.py b/folder_paths.py index c4ff5a91e94c..d4889abd8599 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -5,6 +5,7 @@ import time import mimetypes import logging +import time from typing import Set, List, Dict, Tuple, Literal from collections.abc import Collection from concurrent.futures import ThreadPoolExecutor @@ -214,6 +215,18 @@ def get_folder_paths(folder_name: str) -> list[str]: folder_name = map_legacy(folder_name) return folder_names_and_paths[folder_name][0][:] + +def prebuild_lists(): + start_time = time.time() + calls = [] + for folder_name in folder_names_and_paths: + calls.append(async_executor.submit(lambda: get_filename_list(folder_name))) + while len(calls) > 0: + calls.pop().result() + end_time = time.time() + logging.info("Scanned model lists in {:.2f} seconds".format(end_time - start_time)) + + def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) -> tuple[list[str], dict[str, float]]: if not os.path.isdir(directory): return [], {} @@ -261,11 +274,11 @@ def handle(file): logging.debug("found {} files".format(len(result))) return result, dirs + def filter_files_extensions(files: Collection[str], extensions: Collection[str]) -> list[str]: return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions or len(extensions) == 0, files))) - def get_full_path(folder_name: str, filename: str) -> str | None: global folder_names_and_paths folder_name = map_legacy(folder_name) diff --git a/main.py b/main.py index 3f5e2137eebd..09ff6355feff 100644 --- a/main.py +++ b/main.py @@ -212,6 +212,8 @@ def cleanup_temp(): nodes.init_extra_nodes(init_custom_nodes=not args.disable_all_custom_nodes) + folder_paths.prebuild_lists() + cuda_malloc_warning() server.add_routes() From 409f4b658346a2a3e05dcfad8ce881977461a909 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 3 Oct 2024 13:17:56 -0700 Subject: [PATCH 16/18] minor cleanup --- folder_paths.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index d4889abd8599..d124a352be04 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -221,8 +221,8 @@ def prebuild_lists(): calls = [] for folder_name in folder_names_and_paths: calls.append(async_executor.submit(lambda: get_filename_list(folder_name))) - while len(calls) > 0: - calls.pop().result() + for call in calls: + call.result() end_time = time.time() logging.info("Scanned model lists in {:.2f} seconds".format(end_time - start_time)) @@ -348,8 +348,8 @@ def check_new_dirs(x: str): for x in folders[0]: calls.append(async_executor.submit(lambda f=x: check_new_dirs(f))) - while len(calls) > 0: - calls.pop().result() + for call in calls: + call.result() if must_invalidate.is_set(): return None From 6fca6e1746616d2ec76afd5c95a330cbf9073419 Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 3 Oct 2024 13:27:04 -0700 Subject: [PATCH 17/18] prebuild should have its own executor otherwise it can lock itself due to the very awkward hack of python threading instead of genuine async handling --- folder_paths.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/folder_paths.py b/folder_paths.py index d124a352be04..82ae5b978092 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -5,7 +5,6 @@ import time import mimetypes import logging -import time from typing import Set, List, Dict, Tuple, Literal from collections.abc import Collection from concurrent.futures import ThreadPoolExecutor @@ -217,13 +216,14 @@ def get_folder_paths(folder_name: str) -> list[str]: def prebuild_lists(): - start_time = time.time() - calls = [] - for folder_name in folder_names_and_paths: - calls.append(async_executor.submit(lambda: get_filename_list(folder_name))) - for call in calls: - call.result() - end_time = time.time() + start_time = time.perf_counter() + with ThreadPoolExecutor(32) as executor: + calls = [] + for folder_name in folder_names_and_paths: + calls.append(executor.submit(lambda: get_filename_list(folder_name))) + for call in calls: + call.result() + end_time = time.perf_counter() logging.info("Scanned model lists in {:.2f} seconds".format(end_time - start_time)) From 67a69da55f6748b4eea9fdced6aa53d8761e0a2d Mon Sep 17 00:00:00 2001 From: "Alex \"mcmonkey\" Goodwin" Date: Thu, 3 Oct 2024 14:02:09 -0700 Subject: [PATCH 18/18] pretty that up a tad --- folder_paths.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/folder_paths.py b/folder_paths.py index 82ae5b978092..26b4fcb37e73 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -217,14 +217,17 @@ def get_folder_paths(folder_name: str) -> list[str]: def prebuild_lists(): start_time = time.perf_counter() + with ThreadPoolExecutor(32) as executor: calls = [] for folder_name in folder_names_and_paths: calls.append(executor.submit(lambda: get_filename_list(folder_name))) + for call in calls: call.result() + end_time = time.perf_counter() - logging.info("Scanned model lists in {:.2f} seconds".format(end_time - start_time)) + logging.info(f"Scanned model lists in {end_time - start_time:.2f} seconds") def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) -> tuple[list[str], dict[str, float]]: