Skip to content

Commit f0df034

Browse files
committed
fix #173
1 parent d04b4b3 commit f0df034

File tree

1 file changed

+78
-22
lines changed

1 file changed

+78
-22
lines changed

src/drivers/redis/Queue.php

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use yii\di\Instance;
1313
use yii\queue\cli\Queue as CliQueue;
1414
use yii\redis\Connection;
15+
use yii\redis\Mutex;
1516

1617
/**
1718
* Redis Queue.
@@ -24,10 +25,25 @@ class Queue extends CliQueue
2425
* @var Connection|array|string
2526
*/
2627
public $redis = 'redis';
28+
29+
/**
30+
* @var Mutex|array|string
31+
*/
32+
public $mutex = [
33+
'class' => Mutex::class,
34+
'redis' => 'redis',
35+
];
36+
37+
/**
38+
* @var integer
39+
*/
40+
public $mutexTimeout = 3;
41+
2742
/**
2843
* @var string
2944
*/
3045
public $channel = 'queue';
46+
3147
/**
3248
* @var string command class name
3349
*/
@@ -41,6 +57,7 @@ public function init()
4157
{
4258
parent::init();
4359
$this->redis = Instance::ensure($this->redis, Connection::class);
60+
$this->mutex = Instance::ensure($this->mutex, Mutex::class);
4461
}
4562

4663
/**
@@ -56,15 +73,22 @@ public function run($repeat, $timeout = 0)
5673
{
5774
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
5875
while ($canContinue()) {
59-
if (($payload = $this->reserve($timeout)) !== null) {
60-
list($id, $message, $ttr, $attempt) = $payload;
61-
if ($this->handleMessage($id, $message, $ttr, $attempt)) {
62-
$this->delete($id);
76+
if ($this->acquire()) {
77+
if (($payload = $this->reserve($timeout)) !== null) {
78+
list($id, $message, $ttr, $attempt) = $payload;
79+
if ($this->handleMessage($id, $message, $ttr, $attempt)) {
80+
$this->delete($id);
81+
}
82+
83+
} elseif (!$repeat) {
84+
break;
6385
}
64-
} elseif (!$repeat) {
65-
break;
86+
87+
$this->release();
6688
}
6789
}
90+
91+
$this->release();
6892
});
6993
}
7094

@@ -95,10 +119,15 @@ public function status($id)
95119
*/
96120
public function clear()
97121
{
98-
while (!$this->redis->set("$this->channel.moving_lock", true, 'NX')) {
122+
while (!$this->acquire(0)) {
99123
usleep(10000);
100124
}
101-
$this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*"));
125+
126+
try {
127+
$this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*"));
128+
} finally {
129+
$this->release();
130+
}
102131
}
103132

104133
/**
@@ -110,19 +139,25 @@ public function clear()
110139
*/
111140
public function remove($id)
112141
{
113-
while (!$this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
142+
while (!$this->acquire(0)) {
114143
usleep(10000);
115144
}
116-
if ($this->redis->hdel("$this->channel.messages", $id)) {
117-
$this->redis->zrem("$this->channel.delayed", $id);
118-
$this->redis->zrem("$this->channel.reserved", $id);
119-
$this->redis->lrem("$this->channel.waiting", 0, $id);
120-
$this->redis->hdel("$this->channel.attempts", $id);
121145

122-
return true;
123-
}
146+
try {
147+
if ($this->redis->hdel("$this->channel.messages", $id)) {
148+
$this->redis->zrem("$this->channel.delayed", $id);
149+
$this->redis->zrem("$this->channel.reserved", $id);
150+
$this->redis->lrem("$this->channel.waiting", 0, $id);
151+
$this->redis->hdel("$this->channel.attempts", $id);
152+
153+
return true;
154+
}
155+
156+
return false;
124157

125-
return false;
158+
} finally {
159+
$this->release();
160+
}
126161
}
127162

128163
/**
@@ -131,11 +166,9 @@ public function remove($id)
131166
*/
132167
protected function reserve($timeout)
133168
{
134-
// Moves delayed and reserved jobs into waiting list with lock for one second
135-
if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
136-
$this->moveExpired("$this->channel.delayed");
137-
$this->moveExpired("$this->channel.reserved");
138-
}
169+
// Moves delayed and reserved jobs into waiting list
170+
$this->moveExpired("$this->channel.delayed");
171+
$this->moveExpired("$this->channel.reserved");
139172

140173
// Find a new waiting message
141174
$id = null;
@@ -201,4 +234,27 @@ protected function pushMessage($message, $ttr, $delay, $priority)
201234

202235
return $id;
203236
}
237+
238+
/**
239+
* Acquire the lock.
240+
*
241+
* @return boolean
242+
*/
243+
protected function acquire($timeout = null)
244+
{
245+
$timeout = $timeout !== null ? $timeout : $this->mutexTimeout;
246+
247+
return $this->mutex->acquire(__CLASS__ . $this->channel, $timeout);
248+
}
249+
250+
/**
251+
* Release the lock.
252+
*
253+
* @return boolean
254+
*/
255+
protected function release()
256+
{
257+
return $this->mutex->release(__CLASS__ . $this->channel);
258+
}
259+
204260
}

0 commit comments

Comments
 (0)