diff --git a/src/Commands/QueueWork.php b/src/Commands/QueueWork.php index bbb81ce..4f2bf22 100644 --- a/src/Commands/QueueWork.php +++ b/src/Commands/QueueWork.php @@ -239,7 +239,15 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i timer()->start('work'); $payload = $work->payload; + $payloadMetadata = null; + try { + // Load payload metadata + $payloadMetadata = PayloadMetadata::fromArray($payload['metadata'] ?? []); + + // Renew lock if needed + $this->renewLock($payloadMetadata); + $class = $config->resolveJobClass($payload['job']); $job = new $class($payload['data']); $job->process(); @@ -250,9 +258,7 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i CLI::write('The processing of this job was successful', 'green'); // Check chained jobs - if (isset($payload['metadata']) && $payload['metadata'] !== []) { - $this->processNextJobInChain($payload['metadata']); - } + $this->processNextJobInChain($payloadMetadata); } catch (Throwable $err) { if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) { // Schedule for later @@ -263,6 +269,9 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i } CLI::write('The processing of this job failed', 'red'); } finally { + // Remove lock if needed + $this->clearLock($payloadMetadata); + timer()->stop('work'); CLI::write(sprintf('It took: %s sec', timer()->getElapsedTime('work')) . PHP_EOL, 'cyan'); } @@ -271,10 +280,8 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i /** * Process the next job in the chain */ - private function processNextJobInChain(array $payloadMetadata): void + private function processNextJobInChain(PayloadMetadata $payloadMetadata): void { - $payloadMetadata = PayloadMetadata::fromArray($payloadMetadata); - if (! $payloadMetadata->hasChainedJobs()) { return; } @@ -305,6 +312,40 @@ private function processNextJobInChain(array $payloadMetadata): void CLI::write(sprintf('Chained job: %s has been placed in the queue: %s', $nextPayload->getJob(), $nextPayload->getQueue()), 'green'); } + /** + * Renew task lock + */ + private function renewLock(PayloadMetadata $payloadMetadata): void + { + if (! $payloadMetadata->has('taskLockTTL') || ! $payloadMetadata->has('taskLockKey')) { + return; + } + + $ttl = $payloadMetadata->get('taskLockTTL'); + $key = $payloadMetadata->get('taskLockKey'); + + // Permanent lock, no need to renew + if ($ttl === 0) { + return; + } + + cache()->save($key, [], $ttl); + } + + /** + * Remove task lock + */ + private function clearLock(PayloadMetadata $payloadMetadata): void + { + if (! $payloadMetadata->has('taskLockKey')) { + return; + } + + $key = $payloadMetadata->get('taskLockKey'); + + cache()->delete($key); + } + private function maxJobsCheck(int $maxJobs, int $countJobs): bool { if ($maxJobs > 0 && $countJobs >= $maxJobs) { diff --git a/tests/Commands/QueueWorkTest.php b/tests/Commands/QueueWorkTest.php index dd5a13d..af39092 100644 --- a/tests/Commands/QueueWorkTest.php +++ b/tests/Commands/QueueWorkTest.php @@ -13,6 +13,8 @@ namespace Tests\Commands; +use CodeIgniter\Cache\CacheInterface; +use CodeIgniter\Config\Services; use CodeIgniter\I18n\Time; use CodeIgniter\Queue\Models\QueueJobModel; use CodeIgniter\Test\Filters\CITestStreamFilter; @@ -123,13 +125,17 @@ public function testRunWithChainedQueueSucceed(): void 'job' => 'success', 'data' => ['key' => 'value'], 'metadata' => [ - 'queue' => 'queue', + 'queue' => 'test', 'chainedJobs' => [ [ - 'job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => [ + 'job' => 'success', + 'data' => [ + 'key3' => 'value3', + ], + 'metadata' => [ 'queue' => 'queue', 'priority' => 'high', - 'delay' => 10, + 'delay' => 30, ], ], ], @@ -154,5 +160,178 @@ public function testRunWithChainedQueueSucceed(): void $this->assertSame('The processing of this job was successful', $this->getLine(4)); $this->assertSame('Chained job: success has been placed in the queue: queue', $this->getLine(5)); $this->assertSame('No job available. Stopping.', $this->getLine(8)); + + $this->seeInDatabase('queue_jobs', [ + 'queue' => 'queue', + 'payload' => json_encode([ + 'job' => 'success', + 'data' => ['key3' => 'value3'], + 'metadata' => [ + 'queue' => 'queue', + 'priority' => 'high', + 'delay' => 30, + ], + ]), + ]); + } + + public function testRunWithTaskLock(): void + { + $lockKey = 'test_lock_key'; + $lockTTL = 300; // 5 minutes + + Time::setTestNow('2023-12-19 14:15:16'); + + $cache = $this->createMock(CacheInterface::class); + + // Set up expectations + $cache->expects($this->once()) + ->method('save') + ->with($lockKey, $this->anything(), $lockTTL) + ->willReturn(true); + + $cache->expects($this->once()) + ->method('delete') + ->with($lockKey) + ->willReturn(true); + + // Replace the cache service + Services::injectMock('cache', $cache); + + fake(QueueJobModel::class, [ + 'connection' => 'database', + 'queue' => 'test', + 'payload' => [ + 'job' => 'success', + 'data' => ['key' => 'value'], + 'metadata' => [ + 'taskLockKey' => $lockKey, + 'taskLockTTL' => $lockTTL, + 'queue' => 'test', + ], + ], + 'priority' => 'default', + 'status' => 0, + 'attempts' => 0, + 'available_at' => 1_702_977_074, + ]); + + CITestStreamFilter::registration(); + CITestStreamFilter::addOutputFilter(); + + $this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty')); + $this->parseOutput(CITestStreamFilter::$buffer); + + CITestStreamFilter::removeOutputFilter(); + + $this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0)); + $this->assertSame('Starting a new job: success, with ID: 1', $this->getLine(3)); + $this->assertSame('The processing of this job was successful', $this->getLine(4)); + } + + public function testRunWithPermanentTaskLock(): void + { + $lockKey = 'permanent_lock_key'; + $lockTTL = 0; // Permanent lock + + Time::setTestNow('2023-12-19 14:15:16'); + + $cache = $this->createMock(CacheInterface::class); + + // For permanent lock (TTL=0), save should NOT be called + $cache->expects($this->never()) + ->method('save'); + + $cache->expects($this->once()) + ->method('delete') + ->with($lockKey) + ->willReturn(true); + + // Replace the cache service + Services::injectMock('cache', $cache); + + fake(QueueJobModel::class, [ + 'connection' => 'database', + 'queue' => 'test', + 'payload' => [ + 'job' => 'success', + 'data' => ['key4' => 'value4'], + 'metadata' => [ + 'taskLockKey' => $lockKey, + 'taskLockTTL' => $lockTTL, + 'queue' => 'test', + ], + ], + 'priority' => 'default', + 'status' => 0, + 'attempts' => 0, + 'available_at' => 1_702_977_074, + ]); + + CITestStreamFilter::registration(); + CITestStreamFilter::addOutputFilter(); + + $this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty')); + $this->parseOutput(CITestStreamFilter::$buffer); + + CITestStreamFilter::removeOutputFilter(); + + $this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0)); + $this->assertSame('Starting a new job: success, with ID: 1', $this->getLine(3)); + $this->assertSame('The processing of this job was successful', $this->getLine(4)); + } + + public function testLockClearedOnFailure(): void + { + $lockKey = 'failure_lock_key'; + $lockTTL = 300; + + Time::setTestNow('2023-12-19 14:15:16'); + + $cache = $this->createMock(CacheInterface::class); + + // Set up expectations + $cache->expects($this->once()) + ->method('save') + ->with($lockKey, $this->anything(), $lockTTL) + ->willReturn(true); + + $cache->expects($this->once()) + ->method('delete') + ->with($lockKey) + ->willReturn(true); + + // Replace the cache service + Services::injectMock('cache', $cache); + + fake(QueueJobModel::class, [ + 'connection' => 'database', + 'queue' => 'test', + 'payload' => [ + 'job' => 'failure', + 'data' => ['key' => 'value'], + 'metadata' => [ + 'taskLockKey' => $lockKey, + 'taskLockTTL' => $lockTTL, + 'queue' => 'test', + ], + ], + 'priority' => 'default', + 'status' => 0, + 'attempts' => 0, + 'available_at' => 1_702_977_074, + ]); + + CITestStreamFilter::registration(); + CITestStreamFilter::addOutputFilter(); + + $this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty')); + $this->parseOutput(CITestStreamFilter::$buffer); + + CITestStreamFilter::removeOutputFilter(); + + $this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0)); + $this->assertSame('Starting a new job: failure, with ID: 1', $this->getLine(3)); + $this->assertSame('The processing of this job failed', $this->getLine(4)); } }