@@ -48,6 +48,26 @@ public function __construct(?int $max_childs = null, bool $kill_childs = true)
48
48
});
49
49
}
50
50
51
+ protected function checkChilds (): bool
52
+ {
53
+ self ::breakpoint ("checkChilds() " );
54
+ $ removed = 0 ;
55
+ foreach ($ this ->childs as $ key => $ child ) {
56
+ if (!self ::isProcessRunning ($ child )){
57
+ unset($ this ->childs [$ key ]);
58
+ $ removed ++;
59
+ }
60
+ }
61
+ if ($ removed === 0 ){
62
+ self ::breakpoint ("CheckChilds didn't remove any child " );
63
+ return false ;
64
+ }
65
+ else {
66
+ self ::breakpoint ("CheckChilds removed $ removed childs " );
67
+ return true ;
68
+ };
69
+ }
70
+
51
71
52
72
public function enqueue (callable $ callable , array $ args = []): void
53
73
{
@@ -108,8 +128,10 @@ public function resolveQueue(): void
108
128
if ($ this ->is_resolving_queue ) return ;
109
129
110
130
if (count ($ this ->childs ) >= $ this ->max_childs ){
111
- self ::breakpoint ('resolveQueue() exited because of too many childs ' );
112
- return ;
131
+ if (!$ this ->checkChilds ()) {
132
+ self ::breakpoint ('resolveQueue() exited because of too many childs ' );
133
+ return ;
134
+ }
113
135
}
114
136
115
137
$ this ->is_resolving_queue = true ;
@@ -135,7 +157,8 @@ public function resolveQueue(): void
135
157
136
158
public function __destruct ()
137
159
{
138
- if ($ this ->is_parent ){
160
+ // pid check added because of an unidentified bug
161
+ if ($ this ->is_parent && $ this ->pid === getmypid ()){
139
162
$ this ->need_tick = false ;
140
163
self ::breakpoint ('triggered destructor ' );
141
164
$ this ->wait ();
@@ -147,20 +170,20 @@ public static function getCoresCount(): ?int
147
170
if (isset (self ::$ cores_count )) return self ::$ cores_count === 0 ? null : self ::$ cores_count ;
148
171
149
172
150
- $ ret = @shell_exec ('nproc 2> /dev/null ' );
151
- if (is_string ($ ret )) {
152
- $ ret = trim ($ ret );
153
- if (false !== ($ tmp = filter_var ($ ret , FILTER_VALIDATE_INT ))){
154
- $ cores_count = $ tmp ;
155
- }
156
- }
157
- if (is_readable ('/proc/cpuinfo 2> /dev/null ' )) {
158
- $ cpuinfo = file_get_contents ('/proc/cpuinfo ' );
159
- $ count = substr_count ($ cpuinfo , 'processor ' );
160
- if ($ count > 0 ) {
161
- $ cores_count = $ count ;
162
- }
163
- }
173
+ $ ret = @shell_exec ('nproc 2> /dev/null ' );
174
+ if (is_string ($ ret )) {
175
+ $ ret = trim ($ ret );
176
+ if (false !== ($ tmp = filter_var ($ ret , FILTER_VALIDATE_INT ))){
177
+ $ cores_count = $ tmp ;
178
+ }
179
+ }
180
+ if (is_readable ('/proc/cpuinfo 2> /dev/null ' )) {
181
+ $ cpuinfo = file_get_contents ('/proc/cpuinfo ' );
182
+ $ count = substr_count ($ cpuinfo , 'processor ' );
183
+ if ($ count > 0 ) {
184
+ $ cores_count = $ count ;
185
+ }
186
+ }
164
187
165
188
self ::$ cores_count = $ cores_count ?? 0 ;
166
189
return $ cores_count ?? null ;
@@ -210,8 +233,12 @@ public function waitQueue(): void
210
233
211
234
public function waitChilds (): void
212
235
{
236
+ $ i = 0 ;
213
237
while ($ this ->hasChilds ()){
214
238
self ::breakpoint ('there are still childs ' );
239
+ if ($ i % 100 === 0 ){
240
+ $ this ->checkChilds ();
241
+ }
215
242
usleep (10000 );
216
243
}
217
244
}
@@ -221,6 +248,12 @@ public function wait(): void
221
248
$ this ->waitQueue ();
222
249
$ this ->waitChilds ();
223
250
}
251
+
252
+ public static function isProcessRunning (int $ pid ): bool
253
+ {
254
+ return posix_getpgid ($ pid ) !== false ;
255
+ }
256
+
224
257
}
225
258
226
259
0 commit comments