diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..ae35877
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,26 @@
+; This file is for unifying the coding style for different editors and IDEs.
+; More information at http://editorconfig.org
+
+root = true
+
+[*]
+indent_style = space
+indent_size = 4
+end_of_line = lf
+insert_final_newline = true
+trim_trailing_whitespace = true
+
+[*.bat]
+end_of_line = crlf
+
+[*.yml]
+indent_size = 2
+
+[*.xml]
+indent_size = 2
+
+[Makefile]
+indent_style = tab
+
+[*.neon]
+indent_style = tab
\ No newline at end of file
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..6c30e15
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,21 @@
+name: CI
+
+on:
+ push:
+ branches:
+ - 2.next-cake5
+ pull_request:
+ branches:
+ - '*'
+
+permissions:
+ contents: read
+
+jobs:
+ testsuite:
+ uses: cakephp/.github/.github/workflows/testsuite-with-db.yml@5.x
+ secrets: inherit
+
+ cs-stan:
+ uses: cakephp/.github/.github/workflows/cs-stan.yml@5.x
+ secrets: inherit
diff --git a/.phive/phars.xml b/.phive/phars.xml
new file mode 100644
index 0000000..1e8e15f
--- /dev/null
+++ b/.phive/phars.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..c7536ff
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,18 @@
+# Changelog
+
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
+and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [2.0] - 2024-04-17
+
+### Added
+
+- First release for CakePHP 5.0
+
+## [1.0] - 2024-04-17
+
+### Added
+
+- First release for CakePHP 4.4
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..7e20154
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,6 @@
+Contributing
+============
+
+This repository follows the [CakeDC Plugin Standard](https://www.cakedc.com/plugin-standard). If you'd like to
+contribute new features, enhancements or bug fixes to the plugin, please read our
+[Contribution Guidelines](https://www.cakedc.com/contribution-guidelines) for detailed instructions.
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..a64ec0e
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2024 Cake Development Corporation
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ef83662
--- /dev/null
+++ b/README.md
@@ -0,0 +1,115 @@
+# CakeDC Queue Monitor Plugin for CakePHP
+
+## Versions and branches
+| CakePHP | CakeDC Queue Monitor Plugin | Tag | Notes |
+|:-------:|:--------------------------------------------------------------------------:|:------------:|:-------|
+| ^5.0 | [2.0.0](https://github.com/CakeDC/cakephp-queue-monitor/tree/2.next-cake5) | 2.next-cake5 | stable |
+| ^4.4 | [1.0.0](https://github.com/CakeDC/cakephp-queue-monitor/tree/1.next-cake4) | 1.next-cake4 | stable |
+
+## Overview
+
+The CakeDC Queue Monitor Plugin adds the ability to monitor jobs in queues that are handled by the
+[CakePHP Queue Plugin](https://github.com/cakephp/queue). This plugin checks the duration of work of
+individual Jobs and sends a notification when this time is exceeded by a configurable value.
+
+## Requirements
+* CakePHP 5.0
+* PHP 8.1+
+
+## Installation
+
+You can install this plugin into your CakePHP application using [composer](https://getcomposer.org).
+
+The recommended way to install composer package is:
+```
+composer require cakedc/queue-monitor
+```
+
+## Configuration
+
+Add QueueMonitorPlugin to your `Application::bootstrap`:
+```php
+use Cake\Http\BaseApplication;
+use CakeDC\QueueMonitor\QueueMonitorPlugin;
+
+class Application extends BaseApplication
+{
+ // ...
+
+ public function bootstrap(): void
+ {
+ parent::bootstrap();
+
+ $this->addPlugin(QueueMonitorPlugin::class);
+ }
+
+ // ...
+}
+
+```
+
+Set up the QueueMonitor configuration in your `config/app_local.php`:
+```php
+// ...
+ 'QueueMonitor' => [
+ // mailer config, the default is `default` mailer, you can ommit
+ // this setting if you use default value
+ 'mailerConfig' => 'myCustomMailer',
+
+ // the default is 30 minutes, you can ommit this setting if you
+ // use the default value
+ 'longJobInMinutes' => 45,
+
+ // the default is 30 days, you can ommit this setting if you
+ // its advised to set this value correctly after queue usage analysis to avoid
+ // high space usage in db
+ 'purgeLogsOlderThanDays' => 10,
+
+ // comma separated list of recipients of notification about long running queue jobs
+ 'notificationRecipients' => 'recipient1@yourdomain.com,recipient2@yourdomain.com,recipient3@yourdomain.com',
+ ],
+// ...
+```
+
+Run the required migrations
+```shell
+bin/cake migrations migrate -p CakeDC/QueueMonitor
+```
+
+For each queue configuration add `listener` setting
+```php
+// ...
+ 'Queue' => [
+ 'default' => [
+ // ...
+ 'listener' => \CakeDC\QueueMonitor\Listener\QueueMonitorListener::class,
+ // ...
+ ]
+ ],
+// ...
+```
+
+## Notification command
+
+To set up notifications when there are long running or possible stuck jobs please use command
+```shell
+bin/cake queue_monitor notify
+```
+
+This command will send notification emails to recipients specified in `QueueMonitor.notificationRecipients`. Best is
+to use it as a cronjob
+
+## Purge command
+
+The logs table may grow overtime, to keep it slim you can use the purge command:
+```shell
+bin/cake queue_monitor purge
+```
+
+This command will purge logs older than value specified in `QueueMonitor.purgeLogsOlderThanDays`, the value is in
+days, the default is 30 days. Best is to use it as a cronjob
+
+## Important
+
+Make sure your Job classes have a property value of maxAttempts because if it's missing, the log table can quickly
+grow to gigantic size in the event of an uncaught exception in Job, Job is re-queued indefinitely in such a case.
diff --git a/composer.json b/composer.json
new file mode 100644
index 0000000..b7e381c
--- /dev/null
+++ b/composer.json
@@ -0,0 +1,74 @@
+{
+ "name": "cakedc/queue-monitor",
+ "description": "CakeDC Queue Monitor plugin for CakePHP",
+ "type": "cakephp-plugin",
+ "license": "MIT",
+ "keywords": [
+ "cakephp",
+ "queue",
+ "queue monitoring",
+ "queue monitor"
+ ],
+ "homepage": "https://github.com/CakeDC/cakephp-queue-monitor",
+ "authors": [
+ {
+ "name": "CakeDC",
+ "homepage": "https://www.cakedc.com",
+ "role": "Author"
+ },
+ {
+ "name": "Others",
+ "homepage": "https://github.com/CakeDC/cakephp-queue-monitor/graphs/contributors"
+ }
+ ],
+ "support": {
+ "issues": "https://github.com/CakeDC/cakephp-queue-monitor/issues",
+ "source": "https://github.com/CakeDC/cakephp-queue-monitor"
+ },
+ "require": {
+ "php": ">=8.1",
+ "cakephp/cakephp": "^5.0.1",
+ "cakephp/queue": "^2.0",
+ "ext-json": "*"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "^10.1.0",
+ "cakephp/migrations": "^4.0.0",
+ "cakephp/cakephp-codesniffer": "^5.1"
+ },
+ "autoload": {
+ "psr-4": {
+ "CakeDC\\QueueMonitor\\": "src/"
+ }
+ },
+ "autoload-dev": {
+ "psr-4": {
+ "CakeDC\\QueueMonitor\\Test\\": "tests/",
+ "Cake\\Test\\": "vendor/cakephp/cakephp/tests/"
+ }
+ },
+ "scripts": {
+ "check": [
+ "@cs-check",
+ "@test"
+ ],
+ "cs-check": "phpcs --colors --parallel=16 -p src/ tests/",
+ "cs-fix": "phpcbf --colors --parallel=16 -p src/ tests/",
+ "phpstan": "tools/phpstan analyse",
+ "psalm": "tools/psalm --show-info=false",
+ "stan": [
+ "@phpstan",
+ "@psalm"
+ ],
+ "stan-tests": "phpstan.phar analyze -c tests/phpstan.neon",
+ "stan-baseline": "phpstan.phar --generate-baseline",
+ "stan-setup": "phive install",
+ "test": "phpunit",
+ "test-coverage": "phpunit --coverage-clover=clover.xml"
+ },
+ "config": {
+ "allow-plugins": {
+ "dealerdirect/phpcodesniffer-composer-installer": true
+ }
+ }
+}
diff --git a/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php b/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php
new file mode 100644
index 0000000..335b709
--- /dev/null
+++ b/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php
@@ -0,0 +1,108 @@
+table('queue_monitoring_logs', ['id' => false, 'primary_key' => ['id']])
+ ->addColumn('id', 'uuid', [
+ 'default' => null,
+ 'limit' => null,
+ 'null' => false,
+ ])
+ ->addColumn('created', 'datetime', [
+ 'default' => null,
+ 'limit' => null,
+ 'null' => false,
+ 'precision' => 6,
+ ])
+ ->addColumn('message_id', 'uuid', [
+ 'default' => null,
+ 'limit' => null,
+ 'null' => false,
+ ])
+ ->addColumn('message_timestamp', 'datetime', [
+ 'default' => null,
+ 'limit' => null,
+ 'null' => false,
+ ])
+ ->addColumn('event', 'tinyinteger', [
+ 'default' => null,
+ 'limit' => null,
+ 'null' => false,
+ ])
+ ->addColumn('job', 'string', [
+ 'default' => null,
+ 'limit' => 255,
+ 'null' => true,
+ ])
+ ->addColumn('exception', 'string', [
+ 'default' => null,
+ 'limit' => 255,
+ 'null' => true,
+ ])
+ ->addColumn('content', 'text', [
+ 'default' => null,
+ 'limit' => 4294967295,
+ 'null' => false,
+ ])
+ ->addIndex(
+ [
+ 'created',
+ ],
+ [
+ 'name' => 'logs_created_index',
+ ]
+ )
+ ->addIndex(
+ [
+ 'event',
+ ],
+ [
+ 'name' => 'logs_event_index',
+ ]
+ )
+ ->addIndex(
+ [
+ 'message_id',
+ ],
+ [
+ 'name' => 'logs_message_id_index',
+ ]
+ )
+ ->create();
+ }
+
+ /**
+ * Down Method.
+ *
+ * More information on this method is available here:
+ * https://book.cakephp.org/phinx/0/en/migrations.html#the-down-method
+ *
+ * @return void
+ */
+ public function down(): void
+ {
+ $this->table('queue_monitoring_logs')->drop()->save();
+ }
+}
diff --git a/phpcs.xml b/phpcs.xml
new file mode 100644
index 0000000..e46a08d
--- /dev/null
+++ b/phpcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon
new file mode 100644
index 0000000..e69de29
diff --git a/phpstan.neon b/phpstan.neon
new file mode 100644
index 0000000..7dc051f
--- /dev/null
+++ b/phpstan.neon
@@ -0,0 +1,10 @@
+includes:
+ - phpstan-baseline.neon
+parameters:
+ level: 8
+ checkMissingIterableValueType: false
+ checkGenericClassInNonGenericObjectType: false
+ bootstrapFiles:
+ - tests/bootstrap.php
+ paths:
+ - src/
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
new file mode 100644
index 0000000..031fa04
--- /dev/null
+++ b/phpunit.xml.dist
@@ -0,0 +1,23 @@
+
+
+
+
+
+
+
+
+
+
+
+ tests/TestCase/
+
+
+
+
+
+
+
diff --git a/psalm.xml b/psalm.xml
new file mode 100644
index 0000000..2119034
--- /dev/null
+++ b/psalm.xml
@@ -0,0 +1,34 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Command/NotifyCommand.php b/src/Command/NotifyCommand.php
new file mode 100644
index 0000000..28c72c8
--- /dev/null
+++ b/src/Command/NotifyCommand.php
@@ -0,0 +1,79 @@
+setDescription(__('Queue Monitoring notifier'));
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function execute(Arguments $args, ConsoleIo $io)
+ {
+ try {
+ $this->queueMonitoringService->notifyAboutLongRunningJobs(
+ (int)Configure::read(
+ 'QueueMonitor.longJobInMinutes',
+ self::DEFAULT_LONG_JOB_IN_MINUTES
+ )
+ );
+ } catch (Exception $e) {
+ $this->log("Failed to send queue stuck notifications, reason: {$e->getMessage()}");
+
+ return self::CODE_ERROR;
+ }
+
+ return self::CODE_SUCCESS;
+ }
+}
diff --git a/src/Command/PurgeCommand.php b/src/Command/PurgeCommand.php
new file mode 100644
index 0000000..70ab430
--- /dev/null
+++ b/src/Command/PurgeCommand.php
@@ -0,0 +1,89 @@
+setDescription(__('Queue Monitoring log purger'));
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function execute(Arguments $args, ConsoleIo $io)
+ {
+ $purgeToDate = $this->queueMonitoringService->getPurgeToDate(
+ (int)Configure::read(
+ 'QueueMonitor.purgeLogsOlderThanDays',
+ self::DEFAULT_PURGE_DAYS_OLD
+ )
+ );
+ $this->log(
+ "Purging queue logs older than {$purgeToDate->toDateTimeString()} UTC",
+ LogLevel::INFO
+ );
+ try {
+ $rowCount = $this->queueMonitoringService->purgeLogs(self::DEFAULT_PURGE_DAYS_OLD);
+ $this->log(
+ "Purged $rowCount queue messages older than {$purgeToDate->toDateTimeString()} UTC",
+ LogLevel::INFO
+ );
+ } catch (Exception $e) {
+ $this->log("Failed puring `queue stuck` logs, reason: {$e->getMessage()}");
+
+ return self::CODE_ERROR;
+ }
+
+ return self::CODE_SUCCESS;
+ }
+}
diff --git a/src/Exception/QueueMonitorException.php b/src/Exception/QueueMonitorException.php
new file mode 100644
index 0000000..911566b
--- /dev/null
+++ b/src/Exception/QueueMonitorException.php
@@ -0,0 +1,22 @@
+QueueMonitoringLogs = $this->fetchTable(LogsTable::class);
+ }
+
+ /**
+ * Implemented events
+ */
+ public function implementedEvents(): array
+ {
+ /**
+ * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleMessageEvent()
+ * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleException()
+ * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleSeen()
+ */
+ return [
+ 'Processor.message.exception' => 'handleException',
+ 'Processor.message.invalid' => 'handleMessageEvent',
+ 'Processor.message.reject' => 'handleMessageEvent',
+ 'Processor.message.success' => 'handleMessageEvent',
+ 'Processor.message.failure' => 'handleMessageEvent',
+ 'Processor.message.seen' => 'handleSeen',
+ 'Processor.message.start' => 'handleMessageEvent',
+ ];
+ }
+
+ /**
+ * Handle event `Processor.message.exception`
+ */
+ public function handleException(EventInterface $event, ?Message $message, ?Throwable $exception = null): void
+ {
+ try {
+ $message = $this->validateQueueMessage($message);
+
+ if (!$exception) {
+ throw new QueueMonitorException(
+ 'Queue Exception is null, ensure that the queue job is set up correctly'
+ );
+ }
+
+ $this->storeEvent(
+ $event->getName(),
+ implode('::', $message->getTarget()),
+ $message->getOriginalMessage(),
+ $exception
+ );
+ } catch (Exception $e) {
+ $this->log("Unable to handle queue monitoring exception message event, reason: {$e->getMessage()}");
+ }
+ }
+
+ /**
+ * Handle events
+ * `Processor.message.invalid`
+ * `Processor.message.reject`
+ * `Processor.message.success`
+ * `Processor.message.failure`
+ * `Processor.message.start`
+ */
+ public function handleMessageEvent(EventInterface $event, ?Message $message): void
+ {
+ try {
+ $message = $this->validateQueueMessage($message);
+
+ $this->storeEvent(
+ $event->getName(),
+ implode('::', $message->getTarget()),
+ $message->getOriginalMessage()
+ );
+ } catch (Exception $e) {
+ $this->log('Unable to handle queue monitoring message event ' .
+ "`{$event->getName()}`, reason: {$e->getMessage()}");
+ }
+ }
+
+ /**
+ * Handle event `Processor.message.seen`
+ */
+ public function handleSeen(EventInterface $event, ?QueueMessage $queueMessage): void
+ {
+ try {
+ $queueMessage = $this->validateInteropQueueMessage($queueMessage);
+ $messageBody = json_decode($queueMessage->getBody(), true);
+ $target = is_array($messageBody) ?
+ implode('::', Hash::get($messageBody, 'class')) :
+ '';
+
+ $this->storeEvent(
+ $event->getName(),
+ $target,
+ $queueMessage
+ );
+ } catch (Exception $e) {
+ $this->log('Unable to handle queue monitoring message event ' .
+ "`{$event->getName()}`, reason: {$e->getMessage()}");
+ }
+ }
+
+ /**
+ * @throws \Exception
+ */
+ private function storeEvent(
+ string $eventName,
+ string $target,
+ QueueMessage $queueMessage,
+ ?Throwable $exception = null
+ ): void {
+ if (is_null($queueMessage->getMessageId())) {
+ throw new QueueMonitorException('Missing message id in queue message');
+ }
+ if (is_null($queueMessage->getTimestamp())) {
+ throw new QueueMonitorException('Missing timestamp in queue message');
+ }
+
+ /** @var \CakeDC\QueueMonitor\Model\Entity\Log $queueMonitoringLog */
+ $queueMonitoringLog = $this->QueueMonitoringLogs->newEmptyEntity();
+
+ $queueMonitoringLog->message_id = (string)$queueMessage->getMessageId();
+ $queueMonitoringLog->message_timestamp = DateTime::createFromTimestamp(
+ (int)$queueMessage->getTimestamp(),
+ 'UTC'
+ );
+ $queueMonitoringLog->event = MessageEvent::from($eventName)->getEventAsInt();
+ $queueMonitoringLog->job = $target;
+ $queueMonitoringLog->exception = $exception ? get_class($exception) : null;
+ $queueMonitoringLog->content = (string)json_encode([
+ 'body' => json_decode($queueMessage->getBody(), true),
+ 'headers' => $queueMessage->getHeaders(),
+ 'properties' => $queueMessage->getProperties(),
+ ]);
+
+ $this->QueueMonitoringLogs->saveOrFail($queueMonitoringLog);
+ }
+
+ /**
+ * Validate queue message
+ *
+ * @throws \CakeDC\QueueMonitor\Exception\QueueMonitorException
+ */
+ public function validateQueueMessage(?Message $message): Message
+ {
+ if (!($message instanceof Message) || !is_string($message->getOriginalMessage()->getMessageId())) {
+ throw new QueueMonitorException(
+ 'Message is not an instance of \Cake\Queue\Job\Message, ' .
+ 'ensure that the queue job is set up correctly'
+ );
+ }
+
+ return $message;
+ }
+
+ /**
+ * Validate Interop Queue Message
+ *
+ * @throws \CakeDC\QueueMonitor\Exception\QueueMonitorException
+ */
+ public function validateInteropQueueMessage(?QueueMessage $queueMessage): QueueMessage
+ {
+ if (!($queueMessage instanceof QueueMessage)) {
+ throw new QueueMonitorException(
+ 'Interop QueueMessage is not an instance of \Interop\Queue\Message, ' .
+ 'ensure that the queue job is set up correctly'
+ );
+ }
+
+ return $queueMessage;
+ }
+}
diff --git a/src/Model/Entity/Log.php b/src/Model/Entity/Log.php
new file mode 100644
index 0000000..721de25
--- /dev/null
+++ b/src/Model/Entity/Log.php
@@ -0,0 +1,51 @@
+
+ */
+ protected array $_accessible = [
+ 'created' => true,
+ 'message_id' => true,
+ 'message_timestamp' => true,
+ 'event' => true,
+ 'job' => true,
+ 'exception' => true,
+ 'content' => true,
+ ];
+}
diff --git a/src/Model/Status/MessageEvent.php b/src/Model/Status/MessageEvent.php
new file mode 100644
index 0000000..cef2ac3
--- /dev/null
+++ b/src/Model/Status/MessageEvent.php
@@ -0,0 +1,79 @@
+ 1,
+ self::Invalid => 2,
+ self::Start => 3,
+ self::Exception => 4,
+ self::Success => 5,
+ self::Reject => 6,
+ self::Failure => 7,
+ };
+ }
+
+ /**
+ * Get as options
+ */
+ public static function getOptions(): array
+ {
+ return collection(self::cases())
+ ->combine(
+ fn (MessageEvent $messageEvent) => $messageEvent->getEventAsInt(),
+ fn (MessageEvent $messageEvent) => $messageEvent->name
+ )
+ ->toArray();
+ }
+
+ /**
+ * Get events that indicates that job ended
+ */
+ public static function getNotEndingEvents(): array
+ {
+ return [
+ self::Seen,
+ self::Start,
+ ];
+ }
+
+ /**
+ * Get events that indicates that job ended (int array)
+ */
+ public static function getNotEndingEventsAsInts(): array
+ {
+ return collection(self::getNotEndingEvents())
+ ->map(fn (self $messageEvent): int => $messageEvent->getEventAsInt())
+ ->toList();
+ }
+}
diff --git a/src/Model/Table/LogsTable.php b/src/Model/Table/LogsTable.php
new file mode 100644
index 0000000..74d409c
--- /dev/null
+++ b/src/Model/Table/LogsTable.php
@@ -0,0 +1,92 @@
+setTable('queue_monitoring_logs');
+ $this->setDisplayField('event');
+ $this->setPrimaryKey('id');
+ $this->addBehavior('Timestamp');
+ }
+
+ /**
+ * Find entity with last event
+ *
+ * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findLastEvent()
+ */
+ public function findWithLastEvent(SelectQuery $selectQuery): SelectQuery
+ {
+ return $selectQuery
+ ->find('lastEvent')
+ ->select($this);
+ }
+
+ /**
+ * Find last event
+ */
+ public function findLastEvent(SelectQuery $selectQuery): SelectQuery
+ {
+ return $selectQuery
+ ->select([
+ 'last_event' => $selectQuery->func()->max($this->aliasField('event'), ['integer']),
+ 'last_created' => $selectQuery->func()->max($this->aliasField('created'), ['datetime']),
+ 'message_timestamp',
+ ])
+ ->groupBy($this->aliasField('message_id'));
+ }
+
+ /**
+ * Find stuck jobs
+ *
+ * @throws \Exception
+ * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findLastEvent()
+ */
+ public function findStuckJobs(SelectQuery $selectQuery, DateTime $olderThan): SelectQuery
+ {
+ return $selectQuery
+ ->find('lastEvent')
+ ->having(fn (QueryExpression $queryExpression): QueryExpression => $queryExpression
+ ->in('last_event', MessageEvent::getNotEndingEventsAsInts())
+ ->lte('last_created', $olderThan->toDateTimeString()));
+ }
+}
diff --git a/src/QueueMonitorPlugin.php b/src/QueueMonitorPlugin.php
new file mode 100644
index 0000000..3a98a02
--- /dev/null
+++ b/src/QueueMonitorPlugin.php
@@ -0,0 +1,69 @@
+add('queue_monitor purge', PurgeCommand::class)
+ ->add('queue_monitor notify', NotifyCommand::class);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function services(ContainerInterface $container): void
+ {
+ $container->add(QueueMonitoringService::class);
+ $container
+ ->add(PurgeCommand::class)
+ ->addArguments([
+ QueueMonitoringService::class,
+ ]);
+ $container
+ ->add(NotifyCommand::class)
+ ->addArguments([
+ QueueMonitoringService::class,
+ ]);
+ }
+}
diff --git a/src/Service/QueueMonitoringService.php b/src/Service/QueueMonitoringService.php
new file mode 100644
index 0000000..d345adc
--- /dev/null
+++ b/src/Service/QueueMonitoringService.php
@@ -0,0 +1,110 @@
+QueueMonitoringLogsTable = $this->fetchTable(LogsTable::class);
+ }
+
+ /**
+ * Get purge `to date` value
+ */
+ public function getPurgeToDate(int $daysOld): DateTime
+ {
+ return DateTime::now('UTC')
+ ->subDays($daysOld)
+ ->endOfDay();
+ }
+
+ /**
+ * Purge old logs
+ */
+ public function purgeLogs(int $daysOld): int
+ {
+ return $this->QueueMonitoringLogsTable->deleteAll(
+ fn (QueryExpression $queryExpression): QueryExpression => $queryExpression->lte(
+ $this->QueueMonitoringLogsTable->aliasField('message_timestamp'),
+ $this->getPurgeToDate($daysOld),
+ TableSchemaInterface::TYPE_DATETIME
+ )
+ );
+ }
+
+ /**
+ * get the list of jobs that are have last event older than 30 minutes and event type is not finished
+ * in any way (seen, start)
+ *
+ * @throws \Exception
+ */
+ public function notifyAboutLongRunningJobs(int $longJobsInMinutes): void
+ {
+ $olderThan = DateTime::now('UTC')->subMinutes($longJobsInMinutes);
+
+ /**
+ * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findStuckJobs()
+ */
+ $runningJobs = $this->QueueMonitoringLogsTable
+ ->find(type: 'stuckJobs', olderThan: $olderThan)
+ ->all();
+
+ if ($runningJobs->count()) {
+ $notifyEmails = Configure::read('QueueMonitor.notificationRecipients');
+ if (!$notifyEmails) {
+ throw new QueueMonitorException(
+ 'Missing `QueueMonitor.notificationRecipients` configuration'
+ );
+ }
+ $notifyEmails = explode(',', $notifyEmails);
+ $mailerConfig = Configure::read('QueueMonitor.mailerConfig', 'default');
+ $mailer = new Mailer($mailerConfig);
+ foreach ($notifyEmails as $notifyEmail) {
+ if (!Validation::email($notifyEmail)) {
+ throw new QueueMonitorException(
+ 'Invalid notification email in `QueueMonitor.notificationRecipients`'
+ );
+ }
+ $mailer->addTo(trim($notifyEmail));
+ }
+ $mailer->setSubject('Emergency. There are jobs stuck in queue.')
+ ->deliver('This is automated message about queue job stuck in queue engine.' .
+ " \n\nThere are {$runningJobs->count()} stuck in queue for the last $longJobsInMinutes " .
+ 'minutes and more.');
+ }
+ }
+}
diff --git a/tests/TestCase/Listener/QueueMonitorListenerTest.php b/tests/TestCase/Listener/QueueMonitorListenerTest.php
new file mode 100644
index 0000000..467a76c
--- /dev/null
+++ b/tests/TestCase/Listener/QueueMonitorListenerTest.php
@@ -0,0 +1,163 @@
+ ['processReturnAck', InteropProcessor::ACK, 'Message processed successfully', 'Processor.message.success'],
+ 'null' => ['processReturnNull', InteropProcessor::ACK, 'Message processed successfully', 'Processor.message.success'],
+ 'reject' => ['processReturnReject', InteropProcessor::REJECT, 'Message processed with rejection', 'Processor.message.reject'],
+ 'requeue' => ['processReturnRequeue', InteropProcessor::REQUEUE, 'Message processed with failure, requeuing', 'Processor.message.failure'],
+ 'string' => ['processReturnString', InteropProcessor::REQUEUE, 'Message processed with failure, requeuing', 'Processor.message.failure'],
+ ];
+ }
+
+ /**
+ * Data provider used by testHandleException
+ *
+ * @return array[]
+ */
+ public static function dataProviderTestHandleException(): array
+ {
+ return [
+ ['processAndThrowException'],
+ ['processAndThrowTypeError'],
+ ['processAndThrowError'],
+ ];
+ }
+
+ /**
+ * Test process method
+ *
+ * @param string $jobMethod The method name to run
+ * @param string $expected The expected process result.
+ * @param string $logMessage The log message based on process result.
+ * @param string $dispacthedEvent The dispatched event based on process result.
+ * @dataProvider dataProviderTestProcess
+ * @return void
+ */
+ public function testProcess($jobMethod, $expected, $logMessage, $dispatchedEvent)
+ {
+ $messageBody = [
+ 'class' => [TestProcessor::class, $jobMethod],
+ 'args' => [],
+ ];
+ $connectionFactory = new NullConnectionFactory();
+ $context = $connectionFactory->createContext();
+ $queueMessage = new NullMessage(json_encode($messageBody));
+ $message = new Message($queueMessage, $context);
+
+ $events = new EventList();
+ $logger = new ArrayLog();
+ $processor = new Processor($logger);
+ $processor->getEventManager()->setEventList($events);
+ $processor->getEventManager()->on(new QueueMonitorListener());
+
+ $actual = $processor->process($queueMessage, $context);
+ $this->assertSame($expected, $actual);
+
+ $logs = $logger->read();
+ $this->assertCount(1, $logs);
+ $this->assertStringContainsString('debug', $logs[0]);
+ $this->assertStringContainsString($logMessage, $logs[0]);
+
+ $this->assertSame(3, $events->count());
+ $this->assertSame('Processor.message.seen', $events[0]->getName());
+ $this->assertEquals(['queueMessage' => $queueMessage], $events[0]->getData());
+
+ // Events should contain a message with the same payload.
+ $this->assertSame('Processor.message.start', $events[1]->getName());
+ $data = $events[1]->getData();
+ $this->assertArrayHasKey('message', $data);
+ $this->assertSame($message->jsonSerialize(), $data['message']->jsonSerialize());
+
+ $this->assertSame($dispatchedEvent, $events[2]->getName());
+ $data = $events[2]->getData();
+ $this->assertArrayHasKey('message', $data);
+ $this->assertSame($message->jsonSerialize(), $data['message']->jsonSerialize());
+ }
+
+ /**
+ * When processMessage() throws an exception, test that
+ * requeue will return.
+ *
+ * @return void
+ * @dataProvider dataProviderTestHandleException
+ */
+ public function testProcessWillRequeueOnException(string $method)
+ {
+ $messageBody = [
+ 'class' => [TestProcessor::class, $method],
+ 'data' => ['sample_data' => 'a value', 'key' => md5($method)],
+ ];
+ $connectionFactory = new NullConnectionFactory();
+ $context = $connectionFactory->createContext();
+ $queueMessage = new NullMessage(json_encode($messageBody));
+
+ $events = new EventList();
+ $logger = new ArrayLog();
+ $processor = new Processor($logger);
+ $processor->getEventManager()->setEventList($events);
+ $processor->getEventManager()->on(new QueueMonitorListener());
+
+ $result = $processor->process($queueMessage, $context);
+ $this->assertEquals(InteropProcessor::REQUEUE, $result);
+ }
+
+ /**
+ * Test processMessage method.
+ *
+ * @return void
+ */
+ public function testProcessMessage()
+ {
+ $messageBody = [
+ 'class' => [TestProcessor::class, 'processReturnAck'],
+ 'args' => [],
+ ];
+ $connectionFactory = new NullConnectionFactory();
+ $context = $connectionFactory->createContext();
+ $queueMessage = new NullMessage(json_encode($messageBody));
+ $message = new Message($queueMessage, $context);
+ $processor = new Processor();
+ $processor->getEventManager()->on(new QueueMonitorListener());
+
+ $result = $processor->processMessage($message);
+ $this->assertSame(InteropProcessor::ACK, $result);
+ $this->assertNotEmpty(TestProcessor::$lastProcessMessage);
+ }
+}
diff --git a/tests/TestCase/TestProcessor.php b/tests/TestCase/TestProcessor.php
new file mode 100644
index 0000000..1ce8fbc
--- /dev/null
+++ b/tests/TestCase/TestProcessor.php
@@ -0,0 +1,95 @@
+ [
+ 'engine' => 'File',
+ 'prefix' => 'cake_core_',
+ 'serialize' => true,
+ ],
+ '_cake_model_' => [
+ 'engine' => 'File',
+ 'prefix' => 'cake_model_',
+ 'serialize' => true,
+ ],
+]);
+
+Configure::write('debug', true);
+Configure::write('App', [
+ 'namespace' => 'TestApp',
+ 'encoding' => 'UTF-8',
+ 'paths' => [
+ 'templates' => [ROOT . 'templates' . DS],
+ ],
+]);
+
+Configure::write('Queue', [
+ 'default' => [
+ // Don't actually send messages anywhere.
+ 'url' => 'null:',
+
+ // The queue that will be used for sending messages. default: default
+ // This can be overriden when queuing or processing messages
+ 'queue' => 'default',
+
+ // The name of a configured logger, default: null
+ 'logger' => 'stdout',
+ ],
+]);
+
+// Ensure default test connection is defined
+if (!getenv('db_dsn')) {
+ putenv('db_dsn=sqlite:///:memory:');
+}
+
+ConnectionManager::setConfig('test', [
+ 'url' => getenv('db_dsn'),
+ 'timezone' => 'UTC',
+]);
+
+(new Migrator())->run();
diff --git a/tests/schema.sql b/tests/schema.sql
new file mode 100644
index 0000000..08835b7
--- /dev/null
+++ b/tests/schema.sql
@@ -0,0 +1 @@
+-- Test database schema for QueueMonitorTemplate