2
2
3
3
namespace skrtdev \async ;
4
4
5
- use Closure ;
6
5
7
6
class Pool{
8
7
@@ -19,15 +18,14 @@ class Pool{
19
18
20
19
public function __construct (?int $ max_childs = null , bool $ kill_childs = true )
21
20
{
22
- if (!extension_loaded (" pcntl " )){
23
- throw new MissingExtensionException (" PCNTL Extension is missing in your PHP build " );
21
+ if (!extension_loaded (' pcntl ' )){
22
+ throw new MissingExtensionException (' PCNTL Extension is missing in your PHP build ' );
24
23
}
25
24
$ this ->pid = getmypid ();
26
25
$ this ->max_childs = $ max_childs ?? (self ::getCoresCount () ?? 1 ) * 10 ;
27
26
$ this ->kill_childs = $ kill_childs ;
28
27
29
- register_tick_function ([$ this , "tick " ]);
30
- #pcntl_signal(SIGCHLD, SIG_IGN); // ignores the SIGCHLD signal
28
+ register_tick_function ([$ this , 'tick ' ]);
31
29
32
30
pcntl_async_signals (true );
33
31
@@ -41,52 +39,33 @@ public function __construct(?int $max_childs = null, bool $kill_childs = true)
41
39
42
40
foreach ($ this ->childs as $ key => $ child ) {
43
41
if ($ pid === $ child ){
44
- self ::breakpoint (" removed child from signal handler " );
42
+ self ::breakpoint (' removed child from signal handler ' );
45
43
unset($ this ->childs [$ key ]);
44
+ break ;
46
45
}
47
46
}
48
47
}
49
48
});
50
49
}
51
50
52
- public function checkChilds (): bool
53
- {
54
- return false ;
55
- self ::breakpoint ("checkChilds() " );
56
- $ removed = 0 ;
57
- foreach ($ this ->childs as $ key => $ child ) {
58
- if (!self ::isProcessRunning ($ child )){
59
- unset($ this ->childs [$ key ]);
60
- $ removed ++;
61
- }
62
- }
63
- if ($ removed === 0 ){
64
- self ::breakpoint ("CheckChilds didn't remove any child " );
65
- return false ;
66
- }
67
- else {
68
- self ::breakpoint ("CheckChilds removed $ removed childs " );
69
- return true ;
70
- };
71
- }
72
51
73
- public function enqueue (Closure $ closure , string $ process_title = null , array $ args = []): void
52
+ public function enqueue (callable $ callable , array $ args = []): void
74
53
{
75
- $ this ->queue [] = [$ closure , $ process_title , $ args ];
54
+ $ this ->queue [] = [$ callable , $ args ];
76
55
}
77
56
78
- protected function _parallel (Closure $ closure , string $ process_title = null , array $ args = [])
57
+ protected function _parallel (callable $ callable , array $ args = [])
79
58
{
80
- self ::breakpoint (" started a parallel " );
81
- self ::breakpoint (" parallel can be done: current childs: " .count ($ this ->childs )." / " .$ this ->max_childs );
59
+ self ::breakpoint (' started a parallel ' );
60
+ self ::breakpoint (' parallel can be done: current childs: ' .count ($ this ->childs ).' / ' .$ this ->max_childs );
82
61
$ pid = pcntl_fork ();
83
62
if ($ pid == -1 ) {
84
- throw new CouldNotForkException (" Pool could not fork " );
63
+ throw new CouldNotForkException (' Pool could not fork ' );
85
64
}
86
65
elseif ($ pid ){
87
66
// we are the parent
88
67
$ this ->childs [] = $ pid ;
89
- self ::breakpoint (" child started " );
68
+ self ::breakpoint (' child started ' );
90
69
pcntl_wait ($ status , WNOHANG );
91
70
}
92
71
else {
@@ -95,63 +74,59 @@ protected function _parallel(Closure $closure, string $process_title = null, arr
95
74
if (!$ this ->kill_childs ) {
96
75
pcntl_signal (SIGINT , SIG_IGN );
97
76
}
98
- if (isset ($ process_title )){
99
- @cli_set_process_title ($ process_title );
100
- }
101
- $ closure (...$ args );
77
+ $ callable (...$ args );
102
78
exit ;
103
79
}
104
80
}
105
81
106
- public function parallel (Closure $ closure , string $ process_title = null , ...$ args )
82
+ public function parallel (callable $ callable , ...$ args )
107
83
{
108
- if (count ($ this ->childs ) > $ this ->max_childs /2 ){
109
- #$this->checkChilds();
110
- }
111
84
if ($ this ->hasQueue ()){
112
- self ::breakpoint (" resolving queue before parallel() " );
85
+ self ::breakpoint (' resolving queue before parallel() ' );
113
86
$ this ->resolveQueue ();
114
87
if ($ this ->hasQueue ()){
115
- self ::breakpoint (" enqueueing because there is a queue " );
116
- return $ this ->enqueue ($ closure , $ process_title , $ args );
88
+ self ::breakpoint (' enqueueing because there is a queue ' );
89
+ return $ this ->enqueue ($ callable , $ args );
117
90
}
118
91
}
119
92
elseif (count ($ this ->childs ) > $ this ->max_childs ){
120
- self ::breakpoint ("enqueueing because of max reached (tried checkChilds but no results) " );
121
- return $ this ->enqueue ($ closure , $ process_title , $ args );
93
+ self ::breakpoint ('enqueueing because of max reached ' );
94
+ return $ this ->enqueue ($ callable , $ args );
95
+ }
96
+ return $ this ->_parallel ($ callable , $ args );
97
+ }
98
+
99
+ public function iterate (iterable $ iterable , callable $ callable ): void
100
+ {
101
+ foreach ($ iterable as $ value ) {
102
+ $ this ->parallel ($ callable , $ value );
122
103
}
123
- return $ this ->_parallel ($ closure , $ process_title , $ args );
124
104
}
125
105
126
106
public function resolveQueue (): void
127
107
{
128
108
if ($ this ->is_resolving_queue ) return ;
129
109
130
110
if (count ($ this ->childs ) >= $ this ->max_childs ){
131
- self ::breakpoint ("resolveQueue() -> too many childs, trying to remove... " .PHP_EOL ."check childs from resolveQueue() " );
132
- if (true || !$ this ->checkChilds ()){
133
- self ::breakpoint ("resolveQueue() exited because of too many childs " );
134
- return ;
135
- }
111
+ self ::breakpoint ('resolveQueue() exited because of too many childs ' );
112
+ return ;
136
113
}
137
114
138
115
$ this ->is_resolving_queue = true ;
139
116
140
- foreach ($ this ->queue as $ key => $ closure ) {
117
+ foreach ($ this ->queue as $ key => $ callable ) {
141
118
if (count ($ this ->childs ) < $ this ->max_childs ){
142
119
unset($ this ->queue [$ key ]);
143
120
self ::breakpoint ("resolveQueue() is resolving n. $ key " );
144
- $ this ->_parallel (...$ closure );
121
+ $ this ->_parallel (...$ callable );
145
122
}
146
123
else {
147
- self ::breakpoint (" resolveQueue() can't resolve, too many childs " );
124
+ self ::breakpoint (' resolveQueue() can \ 't resolve, too many childs ' );
148
125
break ;
149
- self ::breakpoint ("check childs from resolveQueue() " );
150
- $ this ->checkChilds ();
151
126
}
152
127
}
153
128
if (empty ($ this ->queue )){
154
- self ::breakpoint (" queue is empty " );
129
+ self ::breakpoint (' queue is empty ' );
155
130
}
156
131
157
132
$ this ->is_resolving_queue = false ;
@@ -162,22 +137,16 @@ public function __destruct()
162
137
{
163
138
if ($ this ->is_parent ){
164
139
$ this ->need_tick = false ;
165
- self ::breakpoint (" triggered destructor " );
140
+ self ::breakpoint (' triggered destructor ' );
166
141
$ this ->wait ();
167
142
}
168
143
}
169
144
170
145
public static function getCoresCount (): ?int
171
146
{
172
- if (isset (self ::$ cores_count ) && self ::$ cores_count === 0 ) return null ;
147
+ if (isset (self ::$ cores_count )) return self ::$ cores_count === 0 ? null : self ::$ cores_count ;
148
+
173
149
174
- if (defined ('PHP_WINDOWS_VERSION_MAJOR ' )){
175
- $ str = trim (shell_exec ('wmic cpu get NumberOfCores 2>&1 ' ));
176
- if (!preg_match ('/(\d+)/ ' , $ str , $ matches )) {
177
- $ cores_count = null ;
178
- }
179
- $ cores_count = (int ) $ matches [1 ];
180
- }
181
150
$ ret = @shell_exec ('nproc 2> /dev/null ' );
182
151
if (is_string ($ ret )) {
183
152
$ ret = trim ($ ret );
@@ -199,19 +168,12 @@ public static function getCoresCount(): ?int
199
168
200
169
public static function breakpoint ($ value ){
201
170
return ;
202
- usleep (5000 );
203
171
print ($ value .PHP_EOL );
204
172
}
205
173
206
- public static function isProcessRunning ($ pid ): bool
207
- {
208
- return posix_getpgid ($ pid ) !== false ;
209
- }
210
-
211
174
public function tick ()
212
175
{
213
176
if ($ this ->is_parent && $ this ->need_tick && self ::$ last_tick !== time ()){
214
- #print("tick".PHP_EOL);
215
177
self ::$ last_tick = time ();
216
178
if (!$ this ->is_resolving_queue ) $ this ->resolveQueue ();
217
179
}
@@ -240,7 +202,7 @@ public function getChildsCount(): int
240
202
public function waitQueue (): void
241
203
{
242
204
while ($ this ->hasQueue ()){
243
- self ::breakpoint (" queue is not empty " );
205
+ self ::breakpoint (' queue is not empty ' );
244
206
$ this ->resolveQueue ();
245
207
usleep (10000 );
246
208
}
@@ -249,8 +211,7 @@ public function waitQueue(): void
249
211
public function waitChilds (): void
250
212
{
251
213
while ($ this ->hasChilds ()){
252
- self ::breakpoint ("there are still childs " );
253
- #$this->checkChilds();
214
+ self ::breakpoint ('there are still childs ' );
254
215
usleep (10000 );
255
216
}
256
217
}
0 commit comments