|
1 | 1 | import random |
2 | 2 | import asyncio |
3 | 3 | import threading |
4 | | -import weakref |
5 | 4 | import logging |
6 | 5 | import time |
| 6 | +import atexit |
7 | 7 | from datetime import datetime |
8 | 8 | from enum import Enum |
9 | 9 | from typing import Callable, Generic, TypeVar, Coroutine, Any |
10 | 10 | from threading import Semaphore |
11 | | -from concurrent.futures.thread import ThreadPoolExecutor, _worker, _base, _threads_queues |
| 11 | +from concurrent.futures.thread import ThreadPoolExecutor |
12 | 12 |
|
13 | 13 | from alibabacloud_credentials.exceptions import CredentialException |
14 | 14 | from alibabacloud_credentials_api import ICredentials |
|
20 | 20 | INT64_MAX = 2 ** 63 - 1 |
21 | 21 | MAX_CONCURRENT_REFRESHES = 100 |
22 | 22 | CONCURRENT_REFRESH_LEASES = Semaphore(MAX_CONCURRENT_REFRESHES) |
| 23 | +EXECUTOR = ThreadPoolExecutor(max_workers=INT64_MAX, thread_name_prefix='non-blocking-refresh') |
23 | 24 |
|
24 | 25 |
|
25 | | -class _DaemonThreadPoolExecutor(ThreadPoolExecutor): |
26 | | - def _adjust_thread_count(self): |
27 | | - # if idle threads are available, don't spin new threads |
28 | | - if self._idle_semaphore.acquire(timeout=0): |
29 | | - return |
30 | | - |
31 | | - # When the executor gets lost, the weakref callback will wake up |
32 | | - # the worker threads. |
33 | | - def weakref_cb(_, q=self._work_queue): |
34 | | - q.put(None) |
35 | | - |
36 | | - num_threads = len(self._threads) |
37 | | - if num_threads < self._max_workers: |
38 | | - thread_name = '%s_%d' % (self._thread_name_prefix or self, |
39 | | - num_threads) |
40 | | - t = threading.Thread(target=_worker, |
41 | | - name=thread_name, |
42 | | - args=(weakref.ref(self, weakref_cb), |
43 | | - self._work_queue, |
44 | | - self._initializer, |
45 | | - self._initargs), |
46 | | - daemon=True) # Set thread as daemon |
47 | | - t.start() |
48 | | - self._threads.add(t) |
49 | | - _threads_queues[t] = self._work_queue |
| 26 | +def _shutdown_handler(): |
| 27 | + log.debug("Shutting down executor...") |
| 28 | + EXECUTOR.shutdown(wait=False) |
50 | 29 |
|
51 | 30 |
|
52 | | -EXECUTOR = _DaemonThreadPoolExecutor(max_workers=INT64_MAX, thread_name_prefix='non-blocking-refresh') |
| 31 | +atexit.register(_shutdown_handler) |
53 | 32 |
|
54 | 33 |
|
55 | 34 | def _jitter_time(now: int, jitter_start: int, jitter_end: int) -> int: |
@@ -141,6 +120,8 @@ def prefetch(self, action: Callable): |
141 | 120 |
|
142 | 121 | try: |
143 | 122 | EXECUTOR.submit(action) |
| 123 | + except KeyboardInterrupt: |
| 124 | + _shutdown_handler() |
144 | 125 | except Exception as t: |
145 | 126 | log.warning(f'Exception occurred when submitting background task.', exc_info=True) |
146 | 127 | finally: |
|
0 commit comments