1212use yii \di \Instance ;
1313use yii \queue \cli \Queue as CliQueue ;
1414use yii \redis \Connection ;
15+ use yii \redis \Mutex ;
1516
1617/**
1718 * Redis Queue.
@@ -24,10 +25,25 @@ class Queue extends CliQueue
2425 * @var Connection|array|string
2526 */
2627 public $ redis = 'redis ' ;
28+
29+ /**
30+ * @var Mutex|array|string
31+ */
32+ public $ mutex = [
33+ 'class ' => Mutex::class,
34+ 'redis ' => 'redis ' ,
35+ ];
36+
37+ /**
38+ * @var integer
39+ */
40+ public $ mutexTimeout = 3 ;
41+
2742 /**
2843 * @var string
2944 */
3045 public $ channel = 'queue ' ;
46+
3147 /**
3248 * @var string command class name
3349 */
@@ -41,6 +57,7 @@ public function init()
4157 {
4258 parent ::init ();
4359 $ this ->redis = Instance::ensure ($ this ->redis , Connection::class);
60+ $ this ->mutex = Instance::ensure ($ this ->mutex , Mutex::class);
4461 }
4562
4663 /**
@@ -56,11 +73,14 @@ public function run($repeat, $timeout = 0)
5673 {
5774 return $ this ->runWorker (function (callable $ canContinue ) use ($ repeat , $ timeout ) {
5875 while ($ canContinue ()) {
59- if (($ payload = $ this ->reserve ($ timeout )) !== null ) {
76+ if ($ this -> acquire () && ($ payload = $ this ->reserve ($ timeout )) !== null ) {
6077 list ($ id , $ message , $ ttr , $ attempt ) = $ payload ;
6178 if ($ this ->handleMessage ($ id , $ message , $ ttr , $ attempt )) {
6279 $ this ->delete ($ id );
6380 }
81+
82+ $ this ->release ();
83+
6484 } elseif (!$ repeat ) {
6585 break ;
6686 }
@@ -95,10 +115,15 @@ public function status($id)
95115 */
96116 public function clear ()
97117 {
98- while (!$ this ->redis -> set ( " $ this -> channel .moving_lock " , true , ' NX ' )) {
118+ while (!$ this ->acquire ( 0 )) {
99119 usleep (10000 );
100120 }
101- $ this ->redis ->executeCommand ('DEL ' , $ this ->redis ->keys ("$ this ->channel .* " ));
121+
122+ try {
123+ $ this ->redis ->executeCommand ('DEL ' , $ this ->redis ->keys ("$ this ->channel .* " ));
124+ } finally {
125+ $ this ->release ();
126+ }
102127 }
103128
104129 /**
@@ -110,19 +135,25 @@ public function clear()
110135 */
111136 public function remove ($ id )
112137 {
113- while (!$ this ->redis -> set ( " $ this -> channel .moving_lock " , true , ' NX ' , ' EX ' , 1 )) {
138+ while (!$ this ->acquire ( 0 )) {
114139 usleep (10000 );
115140 }
116- if ($ this ->redis ->hdel ("$ this ->channel .messages " , $ id )) {
117- $ this ->redis ->zrem ("$ this ->channel .delayed " , $ id );
118- $ this ->redis ->zrem ("$ this ->channel .reserved " , $ id );
119- $ this ->redis ->lrem ("$ this ->channel .waiting " , 0 , $ id );
120- $ this ->redis ->hdel ("$ this ->channel .attempts " , $ id );
121141
122- return true ;
123- }
142+ try {
143+ if ($ this ->redis ->hdel ("$ this ->channel .messages " , $ id )) {
144+ $ this ->redis ->zrem ("$ this ->channel .delayed " , $ id );
145+ $ this ->redis ->zrem ("$ this ->channel .reserved " , $ id );
146+ $ this ->redis ->lrem ("$ this ->channel .waiting " , 0 , $ id );
147+ $ this ->redis ->hdel ("$ this ->channel .attempts " , $ id );
148+
149+ return true ;
150+ }
151+
152+ return false ;
124153
125- return false ;
154+ } finally {
155+ $ this ->release ();
156+ }
126157 }
127158
128159 /**
@@ -131,11 +162,9 @@ public function remove($id)
131162 */
132163 protected function reserve ($ timeout )
133164 {
134- // Moves delayed and reserved jobs into waiting list with lock for one second
135- if ($ this ->redis ->set ("$ this ->channel .moving_lock " , true , 'NX ' , 'EX ' , 1 )) {
136- $ this ->moveExpired ("$ this ->channel .delayed " );
137- $ this ->moveExpired ("$ this ->channel .reserved " );
138- }
165+ // Moves delayed and reserved jobs into waiting list
166+ $ this ->moveExpired ("$ this ->channel .delayed " );
167+ $ this ->moveExpired ("$ this ->channel .reserved " );
139168
140169 // Find a new waiting message
141170 $ id = null ;
@@ -201,4 +230,25 @@ protected function pushMessage($message, $ttr, $delay, $priority)
201230
202231 return $ id ;
203232 }
233+
234+ /**
235+ * Acquire the lock.
236+ *
237+ * @return boolean
238+ */
239+ protected function acquire ($ timeout = null )
240+ {
241+ return $ this ->mutex ->acquire (__CLASS__ . $ this ->channel , $ timeout ?? $ this ->mutexTimeout );
242+ }
243+
244+ /**
245+ * Release the lock.
246+ *
247+ * @return boolean
248+ */
249+ protected function release ()
250+ {
251+ return $ this ->mutex ->release (__CLASS__ . $ this ->channel );
252+ }
253+
204254}
0 commit comments