Skip to content

Commit

Permalink
[10.x] Fixes Batch Callbacks not triggering if job timeout while in t…
Browse files Browse the repository at this point in the history
…ransaction (#48961)

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* Apply fixes from StyleCI

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* Apply fixes from StyleCI

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* Apply fixes from StyleCI

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* Update BatchableTransactionTest.php

* Apply fixes from StyleCI

* wip

* wip

Signed-off-by: Mior Muhammad Zaki <[email protected]>

* formatting

* Apply fixes from StyleCI

* formatting

* formatting

* Apply fixes from StyleCI

* fix test

---------

Signed-off-by: Mior Muhammad Zaki <[email protected]>
Co-authored-by: StyleCI Bot <[email protected]>
Co-authored-by: Taylor Otwell <[email protected]>
  • Loading branch information
3 people authored Nov 10, 2023
1 parent 95e924d commit 20085fe
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/Illuminate/Bus/BatchRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

use Closure;

/**
* @method void rollBack()
*/
interface BatchRepository
{
/**
Expand Down
10 changes: 10 additions & 0 deletions src/Illuminate/Bus/DatabaseBatchRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ public function transaction(Closure $callback)
return $this->connection->transaction(fn () => $callback());
}

/**
* Rollback the last database transaction for the connection.
*
* @return void
*/
public function rollBack()
{
$this->connection->rollBack();
}

/**
* Serialize the given value.
*
Expand Down
23 changes: 23 additions & 0 deletions src/Illuminate/Queue/Jobs/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

namespace Illuminate\Queue\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\BatchRepository;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\ManuallyFailedException;
use Illuminate\Queue\TimeoutExceededException;
use Illuminate\Support\InteractsWithTime;
use Throwable;

abstract class Job
{
Expand Down Expand Up @@ -183,6 +187,25 @@ public function fail($e = null)
return;
}

$commandName = $this->payload()['data']['commandName'] ?? false;

// If the exception is due to a job timing out, we need to rollback the current
// database transaction so that the failed job count can be incremented with
// the proper value. Otherwise, the current transaction will never commit.
if ($e instanceof TimeoutExceededException &&
$commandName &&
in_array(Batchable::class, class_uses_recursive($commandName))) {
$batchRepository = $this->resolve(BatchRepository::class);

if (method_exists($batchRepository, 'rollBack')) {
try {
$batchRepository->rollBack();
} catch (Throwable $e) {
// ...
}
}
}

try {
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
Expand Down
65 changes: 65 additions & 0 deletions tests/Integration/Database/Queue/BatchableTransactionTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

namespace Illuminate\Tests\Integration\Database\Queue;

use Illuminate\Foundation\Testing\DatabaseMigrations;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\DB;
use Illuminate\Tests\Integration\Database\DatabaseTestCase;
use Orchestra\Testbench\Attributes\WithMigration;
use PHPUnit\Framework\Attributes\RequiresPhpExtension;
use Symfony\Component\Process\Exception\ProcessSignaledException;
use Throwable;

use function Orchestra\Testbench\remote;

#[RequiresPhpExtension('pcntl')]
#[WithMigration('laravel', 'queue')]
class BatchableTransactionTest extends DatabaseTestCase
{
use DatabaseMigrations;

protected function defineEnvironment($app)
{
parent::defineEnvironment($app);

$config = $app['config'];

if ($config->get('database.default') === 'testing') {
$this->markTestSkipped('Test does not support using :memory: database connection');
}

$config->set(['queue.default' => 'database']);
}

public function testItCanHandleTimeoutJob()
{
Bus::batch([new Fixtures\TimeOutJobWithTransaction()])
->allowFailures()
->dispatch();

$this->assertSame(1, DB::table('jobs')->count());
$this->assertSame(0, DB::table('failed_jobs')->count());
$this->assertSame(1, DB::table('job_batches')->count());

try {
remote('queue:work --stop-when-empty', [
'DB_CONNECTION' => config('database.default'),
'QUEUE_CONNECTION' => config('queue.default'),
])->run();
} catch (Throwable $e) {
$this->assertInstanceOf(ProcessSignaledException::class, $e);
$this->assertSame('The process has been signaled with signal "9".', $e->getMessage());
}

$this->assertSame(0, DB::table('jobs')->count());
$this->assertSame(1, DB::table('failed_jobs')->count());

$this->assertDatabaseHas('job_batches', [
'total_jobs' => 1,
'pending_jobs' => 1,
'failed_jobs' => 1,
'failed_job_ids' => json_encode(DB::table('failed_jobs')->pluck('uuid')->all()),
]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Illuminate\Tests\Integration\Database\Queue\Fixtures;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\DB;

class TimeOutJobWithTransaction implements ShouldQueue
{
use InteractsWithQueue, Queueable, Batchable;

public int $tries = 1;
public int $timeout = 2;

public function handle(): void
{
DB::transaction(fn () => sleep(20));
}
}
2 changes: 1 addition & 1 deletion tests/Queue/QueueBeanstalkdJobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function testFireProperlyCallsTheJobHandler()
public function testFailProperlyCallsTheJobHandler()
{
$job = $this->getJob();
$job->getPheanstalkJob()->shouldReceive('getData')->once()->andReturn(json_encode(['job' => 'foo', 'uuid' => 'test-uuid', 'data' => ['data']]));
$job->getPheanstalkJob()->shouldReceive('getData')->andReturn(json_encode(['job' => 'foo', 'uuid' => 'test-uuid', 'data' => ['data']]));
$job->getContainer()->shouldReceive('make')->once()->with('foo')->andReturn($handler = m::mock(BeanstalkdJobTestFailedTest::class));
$job->getPheanstalk()->shouldReceive('delete')->once()->with($job->getPheanstalkJob())->andReturnSelf();
$handler->shouldReceive('failed')->once()->with(['data'], m::type(Exception::class), 'test-uuid');
Expand Down

0 comments on commit 20085fe

Please sign in to comment.