-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsenzing_governor_tester_context_manager.py
executable file
·76 lines (51 loc) · 1.89 KB
/
senzing_governor_tester_context_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#! /usr/bin/env python3
# Import from standard library. https://docs.python.org/3/library/
import logging
import os
import threading
import time
# Import from local file system.
import senzing_governor
from senzing_governor import Governor
# Metadata
__all__ = []
__version__ = "1.0.0" # See https://www.python.org/dev/peps/pep-0396/
__date__ = '2020-08-26'
__updated__ = '2022-03-22'
log_format = '%(asctime)s %(message)s'
# -----------------------------------------------------------------------------
# Class: ExampleThread
# -----------------------------------------------------------------------------
class ExampleThread(threading.Thread):
def __init__(self, governor, counter_max):
threading.Thread.__init__(self)
self.counter = 0
self.counter_max = counter_max
self.governor = governor
def run(self):
while self.counter < self.counter_max:
self.counter += 1
self.governor.govern()
logging.info("{0}".format(threading.current_thread().name))
# -----------------------------------------------------------------------------
# main
# -----------------------------------------------------------------------------
if __name__ == '__main__':
# Configure logging.
log_level = logging.INFO
logging.basicConfig(format=log_format, level=log_level)
logging.info("Governor file: {0}".format(senzing_governor.__file__))
# Create Governor.
with Governor(hint="ContextManagerTester") as governor:
# Create threads.
threads = []
for i in range(0, 5):
thread = ExampleThread(governor, 1000)
thread.name = "{0}-thread-{1}".format(ExampleThread.__name__, i)
threads.append(thread)
# Start threads.
for thread in threads:
thread.start()
# Collect inactive threads.
for thread in threads:
thread.join()