Skip to content

feat: add support for task locks #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions src/Commands/QueueWork.php
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,15 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
timer()->start('work');
$payload = $work->payload;

$payloadMetadata = null;

try {
// Load payload metadata
$payloadMetadata = PayloadMetadata::fromArray($payload['metadata'] ?? []);

// Renew lock if needed
$this->renewLock($payloadMetadata);

$class = $config->resolveJobClass($payload['job']);
$job = new $class($payload['data']);
$job->process();
Expand All @@ -250,9 +258,7 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
CLI::write('The processing of this job was successful', 'green');

// Check chained jobs
if (isset($payload['metadata']) && $payload['metadata'] !== []) {
$this->processNextJobInChain($payload['metadata']);
}
$this->processNextJobInChain($payloadMetadata);
} catch (Throwable $err) {
if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) {
// Schedule for later
Expand All @@ -263,6 +269,9 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
}
CLI::write('The processing of this job failed', 'red');
} finally {
// Remove lock if needed
$this->clearLock($payloadMetadata);

timer()->stop('work');
CLI::write(sprintf('It took: %s sec', timer()->getElapsedTime('work')) . PHP_EOL, 'cyan');
}
Expand All @@ -271,10 +280,8 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
/**
* Process the next job in the chain
*/
private function processNextJobInChain(array $payloadMetadata): void
private function processNextJobInChain(PayloadMetadata $payloadMetadata): void
{
$payloadMetadata = PayloadMetadata::fromArray($payloadMetadata);

if (! $payloadMetadata->hasChainedJobs()) {
return;
}
Expand Down Expand Up @@ -305,6 +312,40 @@ private function processNextJobInChain(array $payloadMetadata): void
CLI::write(sprintf('Chained job: %s has been placed in the queue: %s', $nextPayload->getJob(), $nextPayload->getQueue()), 'green');
}

/**
* Renew task lock
*/
private function renewLock(PayloadMetadata $payloadMetadata): void
{
if (! $payloadMetadata->has('taskLockTTL') || ! $payloadMetadata->has('taskLockKey')) {
return;
}

$ttl = $payloadMetadata->get('taskLockTTL');
$key = $payloadMetadata->get('taskLockKey');

// Permanent lock, no need to renew
if ($ttl === 0) {
return;
}

cache()->save($key, [], $ttl);
}

/**
* Remove task lock
*/
private function clearLock(PayloadMetadata $payloadMetadata): void
{
if (! $payloadMetadata->has('taskLockKey')) {
return;
}

$key = $payloadMetadata->get('taskLockKey');

cache()->delete($key);
}

private function maxJobsCheck(int $maxJobs, int $countJobs): bool
{
if ($maxJobs > 0 && $countJobs >= $maxJobs) {
Expand Down
185 changes: 182 additions & 3 deletions tests/Commands/QueueWorkTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

namespace Tests\Commands;

use CodeIgniter\Cache\CacheInterface;
use CodeIgniter\Config\Services;
use CodeIgniter\I18n\Time;
use CodeIgniter\Queue\Models\QueueJobModel;
use CodeIgniter\Test\Filters\CITestStreamFilter;
Expand Down Expand Up @@ -123,13 +125,17 @@ public function testRunWithChainedQueueSucceed(): void
'job' => 'success',
'data' => ['key' => 'value'],
'metadata' => [
'queue' => 'queue',
'queue' => 'test',
'chainedJobs' => [
[
'job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => [
'job' => 'success',
'data' => [
'key3' => 'value3',
],
'metadata' => [
'queue' => 'queue',
'priority' => 'high',
'delay' => 10,
'delay' => 30,
],
],
],
Expand All @@ -154,5 +160,178 @@ public function testRunWithChainedQueueSucceed(): void
$this->assertSame('The processing of this job was successful', $this->getLine(4));
$this->assertSame('Chained job: success has been placed in the queue: queue', $this->getLine(5));
$this->assertSame('No job available. Stopping.', $this->getLine(8));

$this->seeInDatabase('queue_jobs', [
'queue' => 'queue',
'payload' => json_encode([
'job' => 'success',
'data' => ['key3' => 'value3'],
'metadata' => [
'queue' => 'queue',
'priority' => 'high',
'delay' => 30,
],
]),
]);
}

public function testRunWithTaskLock(): void
{
$lockKey = 'test_lock_key';
$lockTTL = 300; // 5 minutes

Time::setTestNow('2023-12-19 14:15:16');

$cache = $this->createMock(CacheInterface::class);

// Set up expectations
$cache->expects($this->once())
->method('save')
->with($lockKey, $this->anything(), $lockTTL)
->willReturn(true);

$cache->expects($this->once())
->method('delete')
->with($lockKey)
->willReturn(true);

// Replace the cache service
Services::injectMock('cache', $cache);

fake(QueueJobModel::class, [
'connection' => 'database',
'queue' => 'test',
'payload' => [
'job' => 'success',
'data' => ['key' => 'value'],
'metadata' => [
'taskLockKey' => $lockKey,
'taskLockTTL' => $lockTTL,
'queue' => 'test',
],
],
'priority' => 'default',
'status' => 0,
'attempts' => 0,
'available_at' => 1_702_977_074,
]);

CITestStreamFilter::registration();
CITestStreamFilter::addOutputFilter();

$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
$this->parseOutput(CITestStreamFilter::$buffer);

CITestStreamFilter::removeOutputFilter();

$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
$this->assertSame('Starting a new job: success, with ID: 1', $this->getLine(3));
$this->assertSame('The processing of this job was successful', $this->getLine(4));
}

public function testRunWithPermanentTaskLock(): void
{
$lockKey = 'permanent_lock_key';
$lockTTL = 0; // Permanent lock

Time::setTestNow('2023-12-19 14:15:16');

$cache = $this->createMock(CacheInterface::class);

// For permanent lock (TTL=0), save should NOT be called
$cache->expects($this->never())
->method('save');

$cache->expects($this->once())
->method('delete')
->with($lockKey)
->willReturn(true);

// Replace the cache service
Services::injectMock('cache', $cache);

fake(QueueJobModel::class, [
'connection' => 'database',
'queue' => 'test',
'payload' => [
'job' => 'success',
'data' => ['key4' => 'value4'],
'metadata' => [
'taskLockKey' => $lockKey,
'taskLockTTL' => $lockTTL,
'queue' => 'test',
],
],
'priority' => 'default',
'status' => 0,
'attempts' => 0,
'available_at' => 1_702_977_074,
]);

CITestStreamFilter::registration();
CITestStreamFilter::addOutputFilter();

$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
$this->parseOutput(CITestStreamFilter::$buffer);

CITestStreamFilter::removeOutputFilter();

$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
$this->assertSame('Starting a new job: success, with ID: 1', $this->getLine(3));
$this->assertSame('The processing of this job was successful', $this->getLine(4));
}

public function testLockClearedOnFailure(): void
{
$lockKey = 'failure_lock_key';
$lockTTL = 300;

Time::setTestNow('2023-12-19 14:15:16');

$cache = $this->createMock(CacheInterface::class);

// Set up expectations
$cache->expects($this->once())
->method('save')
->with($lockKey, $this->anything(), $lockTTL)
->willReturn(true);

$cache->expects($this->once())
->method('delete')
->with($lockKey)
->willReturn(true);

// Replace the cache service
Services::injectMock('cache', $cache);

fake(QueueJobModel::class, [
'connection' => 'database',
'queue' => 'test',
'payload' => [
'job' => 'failure',
'data' => ['key' => 'value'],
'metadata' => [
'taskLockKey' => $lockKey,
'taskLockTTL' => $lockTTL,
'queue' => 'test',
],
],
'priority' => 'default',
'status' => 0,
'attempts' => 0,
'available_at' => 1_702_977_074,
]);

CITestStreamFilter::registration();
CITestStreamFilter::addOutputFilter();

$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
$this->parseOutput(CITestStreamFilter::$buffer);

CITestStreamFilter::removeOutputFilter();

$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
$this->assertSame('Starting a new job: failure, with ID: 1', $this->getLine(3));
$this->assertSame('The processing of this job failed', $this->getLine(4));
}
}
Loading