Skip to content

Commit

Permalink
Merge pull request #22 from deliciousbrains/allowed-job-classes
Browse files Browse the repository at this point in the history
Add ability to restrict allowed job subclasses
  • Loading branch information
eriktorsner authored Feb 8, 2024
2 parents a3031ab + e4282f5 commit 2cc2007
Show file tree
Hide file tree
Showing 12 changed files with 371 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ end_of_line = lf
indent_size = 4
tab_width = 4
indent_style = tab
insert_final_newline = false
insert_final_newline = true
trim_trailing_whitespace = true

[*.txt]
Expand Down
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,72 @@ You can also specify the number of times a job should be attempted before being
wp_queue()->cron( 3 );
```

## Restricting Allowed Job Classes

The queue will handle any subclass of `WP_Queue\Job`. For better security,
it is strongly recommended that the `DatabaseConnection` be instantiated with a list
of allowed `Job` subclasses that are expected to be handled.

You can do that by passing in an array of `Job` subclasses when the `Queue` sets
up its database connection, or by having a database connection that only handles
certain `Job` subclasses.

```php
class Connection extends DatabaseConnection {
public function __construct( $wpdb, array $allowed_job_classes = [] ) {
// If a connection is always dealing with the same Jobs,
// you could explicitly set the allowed job classes here
// rather than pass them in.
if ( empty( $allowed_job_classes ) ) {
$allowed_job_classes = [ Subscribe_User_Job::class ];
}

parent::__construct( $wpdb, $allowed_job_classes );

$this->jobs_table = $wpdb->base_prefix . 'myplugin_subs_jobs';
$this->failures_table = $wpdb->base_prefix . 'myplugin_subs_failures';
}
}

class Subscribe_User_Queue extends Queue {
public function __construct() {
global $wpdb;

// Set up custom database queue, with list of allowed job classes.
parent::__construct( new Connection( $wpdb, [ Subscribe_User_Job::class ] ) );

// Other set up stuff ...
}
}

class MyPlugin {
/**
* @var Subscribe_User_Queue
*/
private $queue;

public function __construct() {
// Part of bring-up ...
$this->queue = new Subscribe_User_Queue();

// Other stuff ...
}

protected function subscribe_user( $user_id ) {
$this->queue->push( new Subscribe_User_Job( $user_id ) );
}

/**
* Triggered by cron or background process etc.
*
* @return bool
*/
protected function process_queue_job() {
return $this->queue->worker()->process();
}
}
```

## Local Development

When developing locally you may want jobs processed instantly, instead of them being pushed to the queue. This can be useful for debugging jobs via Xdebug. Add the following filter to use the `sync` connection.
Expand Down Expand Up @@ -203,4 +269,4 @@ merge unless they pass, and the branch is up-to-date with `master`.

## License

WP Queue is open-sourced software licensed under the [MIT license](https://opensource.org/licenses/MIT).
WP Queue is open-sourced software licensed under the [MIT license](https://opensource.org/licenses/MIT).
2 changes: 1 addition & 1 deletion src/WP_Queue/Connections/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function delete( $job );
*
* @param Job $job
*/
public function release( $job );
public function release( Job $job );

/**
* Push a job onto the failure queue.
Expand Down
59 changes: 46 additions & 13 deletions src/WP_Queue/Connections/DatabaseConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

use Carbon\Carbon;
use Exception;
use WP_Queue\Exceptions\InvalidJobTypeException;
use WP_Queue\Job;

/**
* An implementation of the ConnectionInterface that uses custom database tables
* for storing queue jobs.
*/
class DatabaseConnection implements ConnectionInterface {

/**
Expand All @@ -23,15 +28,22 @@ class DatabaseConnection implements ConnectionInterface {
*/
protected $failures_table;

/**
* @var array|bool
*/
protected $allowed_job_classes = [];

/**
* DatabaseQueue constructor.
*
* @param \wpdb $wpdb
* @param array $allowed_job_classes Job classes that may be handled, default any Job subclass.
*/
public function __construct( $wpdb ) {
$this->database = $wpdb;
$this->jobs_table = $this->database->prefix . 'queue_jobs';
$this->failures_table = $this->database->prefix . 'queue_failures';
public function __construct( $wpdb, array $allowed_job_classes = [] ) {
$this->database = $wpdb;
$this->allowed_job_classes = $allowed_job_classes;
$this->jobs_table = $this->database->prefix . 'queue_jobs';
$this->failures_table = $this->database->prefix . 'queue_failures';
}

/**
Expand Down Expand Up @@ -80,7 +92,9 @@ public function pop() {

$job = $this->vitalize_job( $raw_job );

$this->reserve( $job );
if ( $job && is_a( $job, Job::class ) ) {
$this->reserve( $job );
}

return $job;
}
Expand All @@ -93,8 +107,17 @@ public function pop() {
* @return bool
*/
public function delete( $job ) {
if ( is_a( $job, Job::class ) ) {
$id = $job->id();
} elseif ( is_object( $job ) && property_exists( $job, 'id' ) ) {
$raw_job = (object) $job;
$id = $raw_job->id;
} else {
return false;
}

$where = [
'id' => $job->id(),
'id' => $id,
];

if ( $this->database->delete( $this->jobs_table, $where ) ) {
Expand All @@ -111,7 +134,7 @@ public function delete( $job ) {
*
* @return bool
*/
public function release( $job ) {
public function release( Job $job ) {
$data = [
'job' => serialize( $job ),
'attempts' => $job->attempts(),
Expand Down Expand Up @@ -158,7 +181,7 @@ public function failure( $job, Exception $exception ) {
* @return int
*/
public function jobs() {
$sql = "SELECT COUNT(*) FROM {$this->jobs_table}";
$sql = "SELECT COUNT(*) FROM $this->jobs_table";

return (int) $this->database->get_var( $sql );
}
Expand All @@ -169,7 +192,7 @@ public function jobs() {
* @return int
*/
public function failed_jobs() {
$sql = "SELECT COUNT(*) FROM {$this->failures_table}";
$sql = "SELECT COUNT(*) FROM $this->failures_table";

return (int) $this->database->get_var( $sql );
}
Expand All @@ -179,7 +202,7 @@ public function failed_jobs() {
*
* @param Job $job
*/
protected function reserve( $job ) {
protected function reserve( Job $job ) {
$data = [
'reserved_at' => $this->datetime(),
];
Expand Down Expand Up @@ -208,10 +231,21 @@ protected function release_reserved() {
*
* @param mixed $raw_job
*
* @return Job
* @return Job|bool
*/
protected function vitalize_job( $raw_job ) {
$job = unserialize( $raw_job->job );
$options = [];
if ( ! empty( $this->allowed_job_classes ) ) {
$options['allowed_classes'] = $this->allowed_job_classes;
}

$job = unserialize( $raw_job->job, $options );

if ( ! is_a( $job, Job::class ) ) {
$this->failure( $raw_job, new InvalidJobTypeException() );

return false;
}

$job->set_id( $raw_job->id );
$job->set_attempts( $raw_job->attempts );
Expand Down Expand Up @@ -255,5 +289,4 @@ protected function format_exception( Exception $exception ) {

return $string;
}

}
20 changes: 18 additions & 2 deletions src/WP_Queue/Connections/RedisConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
use Exception;
use WP_Queue\Job;

/**
* An incomplete example of how a new ConnectionInterface could be set up
* for storing queue jobs.
*
* Please see the DatabaseConnection class for a complete working implementation.
*/
class RedisConnection implements ConnectionInterface {

/**
Expand All @@ -16,6 +22,7 @@ class RedisConnection implements ConnectionInterface {
* @return bool|int
*/
public function push( Job $job, $delay = 0 ) {
return false;
}

/**
Expand All @@ -24,22 +31,29 @@ public function push( Job $job, $delay = 0 ) {
* @return bool|Job
*/
public function pop() {
return false;
}

/**
* Delete a job from the queue.
*
* @param Job $job
*
* @return bool
*/
public function delete( $job ) {
return false;
}

/**
* Release a job back onto the queue.
*
* @param Job $job
*
* @return bool
*/
public function release( $job ) {
public function release( Job $job ) {
return false;
}

/**
Expand All @@ -51,6 +65,7 @@ public function release( $job ) {
* @return bool
*/
public function failure( $job, Exception $exception ) {
return false;
}

/**
Expand All @@ -59,6 +74,7 @@ public function failure( $job, Exception $exception ) {
* @return int
*/
public function jobs() {
return 0;
}

/**
Expand All @@ -67,6 +83,6 @@ public function jobs() {
* @return int
*/
public function failed_jobs() {
return 0;
}

}
9 changes: 8 additions & 1 deletion src/WP_Queue/Connections/SyncConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
use Exception;
use WP_Queue\Job;

/**
* A minimal implementation of the ConnectionInterface that handles a pushed
* Job immediately.
*
* Useful for local development when only wanting to debug how Job items are handled.
*/
class SyncConnection implements ConnectionInterface {

/**
* Execute the job immediately without pushing to the queue.
*
Expand Down Expand Up @@ -47,7 +54,7 @@ public function delete( $job ) {
*
* @return bool
*/
public function release( $job ) {
public function release( Job $job ) {
return false;
}

Expand Down
11 changes: 11 additions & 0 deletions src/WP_Queue/Exceptions/InvalidJobTypeException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

namespace WP_Queue\Exceptions;

use Exception;

/**
* Exception for when job data includes an unrecognized class.
*/
class InvalidJobTypeException extends Exception {
}
Loading

0 comments on commit 2cc2007

Please sign in to comment.