2
2
3
3
namespace skrtdev \async ;
4
4
5
+ use Throwable ;
5
6
6
7
class Pool{
7
8
@@ -18,11 +19,17 @@ class Pool{
18
19
19
20
protected static self $ default_pool ;
20
21
22
+ /**
23
+ * @throws MissingExtensionException
24
+ */
21
25
public function __construct (?int $ max_childs = null , bool $ kill_childs = true )
22
26
{
23
27
if (!extension_loaded ('pcntl ' )){
24
28
throw new MissingExtensionException ('PCNTL Extension is missing in your PHP build ' );
25
29
}
30
+ if (!extension_loaded ('posix ' )){
31
+ throw new MissingExtensionException ('POSIX Extension is missing in your PHP build ' );
32
+ }
26
33
$ this ->pid = getmypid ();
27
34
$ this ->max_childs = $ max_childs ?? (self ::getCoresCount () ?? 1 ) * 10 ;
28
35
$ this ->kill_childs = $ kill_childs ;
@@ -31,7 +38,7 @@ public function __construct(?int $max_childs = null, bool $kill_childs = true)
31
38
32
39
pcntl_async_signals (true );
33
40
34
- pcntl_signal (SIGCHLD , function ($ signo , $ status ) {
41
+ pcntl_signal (SIGCHLD , function () {
35
42
while (true ) {
36
43
$ pid = pcntl_waitpid (-1 , $ processState , WNOHANG | WUNTRACED );
37
44
@@ -67,7 +74,7 @@ protected function checkChilds(): bool
67
74
else {
68
75
self ::breakpoint ("CheckChilds removed $ removed childs " );
69
76
return true ;
70
- };
77
+ }
71
78
}
72
79
73
80
@@ -76,6 +83,9 @@ public function enqueue(callable $callable, array $args = []): void
76
83
$ this ->queue [] = [$ callable , $ args ];
77
84
}
78
85
86
+ /**
87
+ * @throws CouldNotForkException
88
+ */
79
89
protected function _parallel (callable $ callable , array $ args = [])
80
90
{
81
91
self ::breakpoint ('started a parallel ' );
@@ -96,11 +106,19 @@ protected function _parallel(callable $callable, array $args = [])
96
106
if (!$ this ->kill_childs ) {
97
107
pcntl_signal (SIGINT , SIG_IGN );
98
108
}
99
- $ callable (...$ args );
109
+ try {
110
+ $ callable (...$ args );
111
+ }
112
+ catch (Throwable $ e ){
113
+ echo "Uncaught $ e " ;
114
+ }
100
115
exit ;
101
116
}
102
117
}
103
118
119
+ /**
120
+ * @throws CouldNotForkException
121
+ */
104
122
public function parallel (callable $ callable , ...$ args )
105
123
{
106
124
if ($ this ->hasQueue ()){
@@ -262,6 +280,3 @@ public static function getDefaultPool(): self
262
280
}
263
281
264
282
}
265
-
266
-
267
- ?>
0 commit comments