From c79a0356d4231f0d19f46df8653e42dd65be46d0 Mon Sep 17 00:00:00 2001 From: Adam Rusinowski Date: Thu, 18 Apr 2024 16:51:10 +0200 Subject: [PATCH] initial commit of CakePHP 5.0 version --- .editorconfig | 26 +++ .github/workflows/ci.yml | 21 ++ .phive/phars.xml | 5 + CHANGELOG.md | 18 ++ CONTRIBUTING.md | 6 + LICENSE | 21 ++ README.md | 115 ++++++++++ composer.json | 74 ++++++ ...231213090000_CreateQueueMonitoringLogs.php | 108 +++++++++ phpcs.xml | 6 + phpstan-baseline.neon | 0 phpstan.neon | 10 + phpunit.xml.dist | 23 ++ psalm.xml | 34 +++ src/Command/NotifyCommand.php | 79 +++++++ src/Command/PurgeCommand.php | 89 ++++++++ src/Exception/QueueMonitorException.php | 22 ++ src/Listener/QueueMonitorListener.php | 212 ++++++++++++++++++ src/Model/Entity/Log.php | 51 +++++ src/Model/Status/MessageEvent.php | 79 +++++++ src/Model/Table/LogsTable.php | 92 ++++++++ src/QueueMonitorPlugin.php | 69 ++++++ src/Service/QueueMonitoringService.php | 110 +++++++++ .../Listener/QueueMonitorListenerTest.php | 163 ++++++++++++++ tests/TestCase/TestProcessor.php | 95 ++++++++ tests/bootstrap.php | 104 +++++++++ tests/schema.sql | 1 + 27 files changed, 1633 insertions(+) create mode 100644 .editorconfig create mode 100644 .github/workflows/ci.yml create mode 100644 .phive/phars.xml create mode 100644 CHANGELOG.md create mode 100644 CONTRIBUTING.md create mode 100644 LICENSE create mode 100644 README.md create mode 100644 composer.json create mode 100644 config/Migrations/20231213090000_CreateQueueMonitoringLogs.php create mode 100644 phpcs.xml create mode 100644 phpstan-baseline.neon create mode 100644 phpstan.neon create mode 100644 phpunit.xml.dist create mode 100644 psalm.xml create mode 100644 src/Command/NotifyCommand.php create mode 100644 src/Command/PurgeCommand.php create mode 100644 src/Exception/QueueMonitorException.php create mode 100644 src/Listener/QueueMonitorListener.php create mode 100644 src/Model/Entity/Log.php create mode 100644 src/Model/Status/MessageEvent.php create mode 100644 src/Model/Table/LogsTable.php create mode 100644 src/QueueMonitorPlugin.php create mode 100644 src/Service/QueueMonitoringService.php create mode 100644 tests/TestCase/Listener/QueueMonitorListenerTest.php create mode 100644 tests/TestCase/TestProcessor.php create mode 100644 tests/bootstrap.php create mode 100644 tests/schema.sql diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..ae35877 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,26 @@ +; This file is for unifying the coding style for different editors and IDEs. +; More information at http://editorconfig.org + +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.bat] +end_of_line = crlf + +[*.yml] +indent_size = 2 + +[*.xml] +indent_size = 2 + +[Makefile] +indent_style = tab + +[*.neon] +indent_style = tab \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..6c30e15 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,21 @@ +name: CI + +on: + push: + branches: + - 2.next-cake5 + pull_request: + branches: + - '*' + +permissions: + contents: read + +jobs: + testsuite: + uses: cakephp/.github/.github/workflows/testsuite-with-db.yml@5.x + secrets: inherit + + cs-stan: + uses: cakephp/.github/.github/workflows/cs-stan.yml@5.x + secrets: inherit diff --git a/.phive/phars.xml b/.phive/phars.xml new file mode 100644 index 0000000..1e8e15f --- /dev/null +++ b/.phive/phars.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..c7536ff --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,18 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [2.0] - 2024-04-17 + +### Added + +- First release for CakePHP 5.0 + +## [1.0] - 2024-04-17 + +### Added + +- First release for CakePHP 4.4 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..7e20154 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,6 @@ +Contributing +============ + +This repository follows the [CakeDC Plugin Standard](https://www.cakedc.com/plugin-standard). If you'd like to +contribute new features, enhancements or bug fixes to the plugin, please read our +[Contribution Guidelines](https://www.cakedc.com/contribution-guidelines) for detailed instructions. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a64ec0e --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Cake Development Corporation + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ef83662 --- /dev/null +++ b/README.md @@ -0,0 +1,115 @@ +# CakeDC Queue Monitor Plugin for CakePHP + +## Versions and branches +| CakePHP | CakeDC Queue Monitor Plugin | Tag | Notes | +|:-------:|:--------------------------------------------------------------------------:|:------------:|:-------| +| ^5.0 | [2.0.0](https://github.com/CakeDC/cakephp-queue-monitor/tree/2.next-cake5) | 2.next-cake5 | stable | +| ^4.4 | [1.0.0](https://github.com/CakeDC/cakephp-queue-monitor/tree/1.next-cake4) | 1.next-cake4 | stable | + +## Overview + +The CakeDC Queue Monitor Plugin adds the ability to monitor jobs in queues that are handled by the +[CakePHP Queue Plugin](https://github.com/cakephp/queue). This plugin checks the duration of work of +individual Jobs and sends a notification when this time is exceeded by a configurable value. + +## Requirements +* CakePHP 5.0 +* PHP 8.1+ + +## Installation + +You can install this plugin into your CakePHP application using [composer](https://getcomposer.org). + +The recommended way to install composer package is: +``` +composer require cakedc/queue-monitor +``` + +## Configuration + +Add QueueMonitorPlugin to your `Application::bootstrap`: +```php +use Cake\Http\BaseApplication; +use CakeDC\QueueMonitor\QueueMonitorPlugin; + +class Application extends BaseApplication +{ + // ... + + public function bootstrap(): void + { + parent::bootstrap(); + + $this->addPlugin(QueueMonitorPlugin::class); + } + + // ... +} + +``` + +Set up the QueueMonitor configuration in your `config/app_local.php`: +```php +// ... + 'QueueMonitor' => [ + // mailer config, the default is `default` mailer, you can ommit + // this setting if you use default value + 'mailerConfig' => 'myCustomMailer', + + // the default is 30 minutes, you can ommit this setting if you + // use the default value + 'longJobInMinutes' => 45, + + // the default is 30 days, you can ommit this setting if you + // its advised to set this value correctly after queue usage analysis to avoid + // high space usage in db + 'purgeLogsOlderThanDays' => 10, + + // comma separated list of recipients of notification about long running queue jobs + 'notificationRecipients' => 'recipient1@yourdomain.com,recipient2@yourdomain.com,recipient3@yourdomain.com', + ], +// ... +``` + +Run the required migrations +```shell +bin/cake migrations migrate -p CakeDC/QueueMonitor +``` + +For each queue configuration add `listener` setting +```php +// ... + 'Queue' => [ + 'default' => [ + // ... + 'listener' => \CakeDC\QueueMonitor\Listener\QueueMonitorListener::class, + // ... + ] + ], +// ... +``` + +## Notification command + +To set up notifications when there are long running or possible stuck jobs please use command +```shell +bin/cake queue_monitor notify +``` + +This command will send notification emails to recipients specified in `QueueMonitor.notificationRecipients`. Best is +to use it as a cronjob + +## Purge command + +The logs table may grow overtime, to keep it slim you can use the purge command: +```shell +bin/cake queue_monitor purge +``` + +This command will purge logs older than value specified in `QueueMonitor.purgeLogsOlderThanDays`, the value is in +days, the default is 30 days. Best is to use it as a cronjob + +## Important + +Make sure your Job classes have a property value of maxAttempts because if it's missing, the log table can quickly +grow to gigantic size in the event of an uncaught exception in Job, Job is re-queued indefinitely in such a case. diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..b7e381c --- /dev/null +++ b/composer.json @@ -0,0 +1,74 @@ +{ + "name": "cakedc/queue-monitor", + "description": "CakeDC Queue Monitor plugin for CakePHP", + "type": "cakephp-plugin", + "license": "MIT", + "keywords": [ + "cakephp", + "queue", + "queue monitoring", + "queue monitor" + ], + "homepage": "https://github.com/CakeDC/cakephp-queue-monitor", + "authors": [ + { + "name": "CakeDC", + "homepage": "https://www.cakedc.com", + "role": "Author" + }, + { + "name": "Others", + "homepage": "https://github.com/CakeDC/cakephp-queue-monitor/graphs/contributors" + } + ], + "support": { + "issues": "https://github.com/CakeDC/cakephp-queue-monitor/issues", + "source": "https://github.com/CakeDC/cakephp-queue-monitor" + }, + "require": { + "php": ">=8.1", + "cakephp/cakephp": "^5.0.1", + "cakephp/queue": "^2.0", + "ext-json": "*" + }, + "require-dev": { + "phpunit/phpunit": "^10.1.0", + "cakephp/migrations": "^4.0.0", + "cakephp/cakephp-codesniffer": "^5.1" + }, + "autoload": { + "psr-4": { + "CakeDC\\QueueMonitor\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "CakeDC\\QueueMonitor\\Test\\": "tests/", + "Cake\\Test\\": "vendor/cakephp/cakephp/tests/" + } + }, + "scripts": { + "check": [ + "@cs-check", + "@test" + ], + "cs-check": "phpcs --colors --parallel=16 -p src/ tests/", + "cs-fix": "phpcbf --colors --parallel=16 -p src/ tests/", + "phpstan": "tools/phpstan analyse", + "psalm": "tools/psalm --show-info=false", + "stan": [ + "@phpstan", + "@psalm" + ], + "stan-tests": "phpstan.phar analyze -c tests/phpstan.neon", + "stan-baseline": "phpstan.phar --generate-baseline", + "stan-setup": "phive install", + "test": "phpunit", + "test-coverage": "phpunit --coverage-clover=clover.xml" + }, + "config": { + "allow-plugins": { + "dealerdirect/phpcodesniffer-composer-installer": true + } + } +} diff --git a/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php b/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php new file mode 100644 index 0000000..335b709 --- /dev/null +++ b/config/Migrations/20231213090000_CreateQueueMonitoringLogs.php @@ -0,0 +1,108 @@ +table('queue_monitoring_logs', ['id' => false, 'primary_key' => ['id']]) + ->addColumn('id', 'uuid', [ + 'default' => null, + 'limit' => null, + 'null' => false, + ]) + ->addColumn('created', 'datetime', [ + 'default' => null, + 'limit' => null, + 'null' => false, + 'precision' => 6, + ]) + ->addColumn('message_id', 'uuid', [ + 'default' => null, + 'limit' => null, + 'null' => false, + ]) + ->addColumn('message_timestamp', 'datetime', [ + 'default' => null, + 'limit' => null, + 'null' => false, + ]) + ->addColumn('event', 'tinyinteger', [ + 'default' => null, + 'limit' => null, + 'null' => false, + ]) + ->addColumn('job', 'string', [ + 'default' => null, + 'limit' => 255, + 'null' => true, + ]) + ->addColumn('exception', 'string', [ + 'default' => null, + 'limit' => 255, + 'null' => true, + ]) + ->addColumn('content', 'text', [ + 'default' => null, + 'limit' => 4294967295, + 'null' => false, + ]) + ->addIndex( + [ + 'created', + ], + [ + 'name' => 'logs_created_index', + ] + ) + ->addIndex( + [ + 'event', + ], + [ + 'name' => 'logs_event_index', + ] + ) + ->addIndex( + [ + 'message_id', + ], + [ + 'name' => 'logs_message_id_index', + ] + ) + ->create(); + } + + /** + * Down Method. + * + * More information on this method is available here: + * https://book.cakephp.org/phinx/0/en/migrations.html#the-down-method + * + * @return void + */ + public function down(): void + { + $this->table('queue_monitoring_logs')->drop()->save(); + } +} diff --git a/phpcs.xml b/phpcs.xml new file mode 100644 index 0000000..e46a08d --- /dev/null +++ b/phpcs.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon new file mode 100644 index 0000000..e69de29 diff --git a/phpstan.neon b/phpstan.neon new file mode 100644 index 0000000..7dc051f --- /dev/null +++ b/phpstan.neon @@ -0,0 +1,10 @@ +includes: + - phpstan-baseline.neon +parameters: + level: 8 + checkMissingIterableValueType: false + checkGenericClassInNonGenericObjectType: false + bootstrapFiles: + - tests/bootstrap.php + paths: + - src/ diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..031fa04 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,23 @@ + + + + + + + + + + + + tests/TestCase/ + + + + + + + + src/ + + + diff --git a/psalm.xml b/psalm.xml new file mode 100644 index 0000000..2119034 --- /dev/null +++ b/psalm.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/Command/NotifyCommand.php b/src/Command/NotifyCommand.php new file mode 100644 index 0000000..28c72c8 --- /dev/null +++ b/src/Command/NotifyCommand.php @@ -0,0 +1,79 @@ +setDescription(__('Queue Monitoring notifier')); + } + + /** + * @inheritDoc + */ + public function execute(Arguments $args, ConsoleIo $io) + { + try { + $this->queueMonitoringService->notifyAboutLongRunningJobs( + (int)Configure::read( + 'QueueMonitor.longJobInMinutes', + self::DEFAULT_LONG_JOB_IN_MINUTES + ) + ); + } catch (Exception $e) { + $this->log("Failed to send queue stuck notifications, reason: {$e->getMessage()}"); + + return self::CODE_ERROR; + } + + return self::CODE_SUCCESS; + } +} diff --git a/src/Command/PurgeCommand.php b/src/Command/PurgeCommand.php new file mode 100644 index 0000000..70ab430 --- /dev/null +++ b/src/Command/PurgeCommand.php @@ -0,0 +1,89 @@ +setDescription(__('Queue Monitoring log purger')); + } + + /** + * @inheritDoc + */ + public function execute(Arguments $args, ConsoleIo $io) + { + $purgeToDate = $this->queueMonitoringService->getPurgeToDate( + (int)Configure::read( + 'QueueMonitor.purgeLogsOlderThanDays', + self::DEFAULT_PURGE_DAYS_OLD + ) + ); + $this->log( + "Purging queue logs older than {$purgeToDate->toDateTimeString()} UTC", + LogLevel::INFO + ); + try { + $rowCount = $this->queueMonitoringService->purgeLogs(self::DEFAULT_PURGE_DAYS_OLD); + $this->log( + "Purged $rowCount queue messages older than {$purgeToDate->toDateTimeString()} UTC", + LogLevel::INFO + ); + } catch (Exception $e) { + $this->log("Failed puring `queue stuck` logs, reason: {$e->getMessage()}"); + + return self::CODE_ERROR; + } + + return self::CODE_SUCCESS; + } +} diff --git a/src/Exception/QueueMonitorException.php b/src/Exception/QueueMonitorException.php new file mode 100644 index 0000000..911566b --- /dev/null +++ b/src/Exception/QueueMonitorException.php @@ -0,0 +1,22 @@ +QueueMonitoringLogs = $this->fetchTable(LogsTable::class); + } + + /** + * Implemented events + */ + public function implementedEvents(): array + { + /** + * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleMessageEvent() + * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleException() + * @uses \CakeDC\QueueMonitor\Listener\QueueMonitorListener::handleSeen() + */ + return [ + 'Processor.message.exception' => 'handleException', + 'Processor.message.invalid' => 'handleMessageEvent', + 'Processor.message.reject' => 'handleMessageEvent', + 'Processor.message.success' => 'handleMessageEvent', + 'Processor.message.failure' => 'handleMessageEvent', + 'Processor.message.seen' => 'handleSeen', + 'Processor.message.start' => 'handleMessageEvent', + ]; + } + + /** + * Handle event `Processor.message.exception` + */ + public function handleException(EventInterface $event, ?Message $message, ?Throwable $exception = null): void + { + try { + $message = $this->validateQueueMessage($message); + + if (!$exception) { + throw new QueueMonitorException( + 'Queue Exception is null, ensure that the queue job is set up correctly' + ); + } + + $this->storeEvent( + $event->getName(), + implode('::', $message->getTarget()), + $message->getOriginalMessage(), + $exception + ); + } catch (Exception $e) { + $this->log("Unable to handle queue monitoring exception message event, reason: {$e->getMessage()}"); + } + } + + /** + * Handle events + * `Processor.message.invalid` + * `Processor.message.reject` + * `Processor.message.success` + * `Processor.message.failure` + * `Processor.message.start` + */ + public function handleMessageEvent(EventInterface $event, ?Message $message): void + { + try { + $message = $this->validateQueueMessage($message); + + $this->storeEvent( + $event->getName(), + implode('::', $message->getTarget()), + $message->getOriginalMessage() + ); + } catch (Exception $e) { + $this->log('Unable to handle queue monitoring message event ' . + "`{$event->getName()}`, reason: {$e->getMessage()}"); + } + } + + /** + * Handle event `Processor.message.seen` + */ + public function handleSeen(EventInterface $event, ?QueueMessage $queueMessage): void + { + try { + $queueMessage = $this->validateInteropQueueMessage($queueMessage); + $messageBody = json_decode($queueMessage->getBody(), true); + $target = is_array($messageBody) ? + implode('::', Hash::get($messageBody, 'class')) : + ''; + + $this->storeEvent( + $event->getName(), + $target, + $queueMessage + ); + } catch (Exception $e) { + $this->log('Unable to handle queue monitoring message event ' . + "`{$event->getName()}`, reason: {$e->getMessage()}"); + } + } + + /** + * @throws \Exception + */ + private function storeEvent( + string $eventName, + string $target, + QueueMessage $queueMessage, + ?Throwable $exception = null + ): void { + if (is_null($queueMessage->getMessageId())) { + throw new QueueMonitorException('Missing message id in queue message'); + } + if (is_null($queueMessage->getTimestamp())) { + throw new QueueMonitorException('Missing timestamp in queue message'); + } + + /** @var \CakeDC\QueueMonitor\Model\Entity\Log $queueMonitoringLog */ + $queueMonitoringLog = $this->QueueMonitoringLogs->newEmptyEntity(); + + $queueMonitoringLog->message_id = (string)$queueMessage->getMessageId(); + $queueMonitoringLog->message_timestamp = DateTime::createFromTimestamp( + (int)$queueMessage->getTimestamp(), + 'UTC' + ); + $queueMonitoringLog->event = MessageEvent::from($eventName)->getEventAsInt(); + $queueMonitoringLog->job = $target; + $queueMonitoringLog->exception = $exception ? get_class($exception) : null; + $queueMonitoringLog->content = (string)json_encode([ + 'body' => json_decode($queueMessage->getBody(), true), + 'headers' => $queueMessage->getHeaders(), + 'properties' => $queueMessage->getProperties(), + ]); + + $this->QueueMonitoringLogs->saveOrFail($queueMonitoringLog); + } + + /** + * Validate queue message + * + * @throws \CakeDC\QueueMonitor\Exception\QueueMonitorException + */ + public function validateQueueMessage(?Message $message): Message + { + if (!($message instanceof Message) || !is_string($message->getOriginalMessage()->getMessageId())) { + throw new QueueMonitorException( + 'Message is not an instance of \Cake\Queue\Job\Message, ' . + 'ensure that the queue job is set up correctly' + ); + } + + return $message; + } + + /** + * Validate Interop Queue Message + * + * @throws \CakeDC\QueueMonitor\Exception\QueueMonitorException + */ + public function validateInteropQueueMessage(?QueueMessage $queueMessage): QueueMessage + { + if (!($queueMessage instanceof QueueMessage)) { + throw new QueueMonitorException( + 'Interop QueueMessage is not an instance of \Interop\Queue\Message, ' . + 'ensure that the queue job is set up correctly' + ); + } + + return $queueMessage; + } +} diff --git a/src/Model/Entity/Log.php b/src/Model/Entity/Log.php new file mode 100644 index 0000000..721de25 --- /dev/null +++ b/src/Model/Entity/Log.php @@ -0,0 +1,51 @@ + + */ + protected array $_accessible = [ + 'created' => true, + 'message_id' => true, + 'message_timestamp' => true, + 'event' => true, + 'job' => true, + 'exception' => true, + 'content' => true, + ]; +} diff --git a/src/Model/Status/MessageEvent.php b/src/Model/Status/MessageEvent.php new file mode 100644 index 0000000..cef2ac3 --- /dev/null +++ b/src/Model/Status/MessageEvent.php @@ -0,0 +1,79 @@ + 1, + self::Invalid => 2, + self::Start => 3, + self::Exception => 4, + self::Success => 5, + self::Reject => 6, + self::Failure => 7, + }; + } + + /** + * Get as options + */ + public static function getOptions(): array + { + return collection(self::cases()) + ->combine( + fn (MessageEvent $messageEvent) => $messageEvent->getEventAsInt(), + fn (MessageEvent $messageEvent) => $messageEvent->name + ) + ->toArray(); + } + + /** + * Get events that indicates that job ended + */ + public static function getNotEndingEvents(): array + { + return [ + self::Seen, + self::Start, + ]; + } + + /** + * Get events that indicates that job ended (int array) + */ + public static function getNotEndingEventsAsInts(): array + { + return collection(self::getNotEndingEvents()) + ->map(fn (self $messageEvent): int => $messageEvent->getEventAsInt()) + ->toList(); + } +} diff --git a/src/Model/Table/LogsTable.php b/src/Model/Table/LogsTable.php new file mode 100644 index 0000000..74d409c --- /dev/null +++ b/src/Model/Table/LogsTable.php @@ -0,0 +1,92 @@ +setTable('queue_monitoring_logs'); + $this->setDisplayField('event'); + $this->setPrimaryKey('id'); + $this->addBehavior('Timestamp'); + } + + /** + * Find entity with last event + * + * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findLastEvent() + */ + public function findWithLastEvent(SelectQuery $selectQuery): SelectQuery + { + return $selectQuery + ->find('lastEvent') + ->select($this); + } + + /** + * Find last event + */ + public function findLastEvent(SelectQuery $selectQuery): SelectQuery + { + return $selectQuery + ->select([ + 'last_event' => $selectQuery->func()->max($this->aliasField('event'), ['integer']), + 'last_created' => $selectQuery->func()->max($this->aliasField('created'), ['datetime']), + 'message_timestamp', + ]) + ->groupBy($this->aliasField('message_id')); + } + + /** + * Find stuck jobs + * + * @throws \Exception + * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findLastEvent() + */ + public function findStuckJobs(SelectQuery $selectQuery, DateTime $olderThan): SelectQuery + { + return $selectQuery + ->find('lastEvent') + ->having(fn (QueryExpression $queryExpression): QueryExpression => $queryExpression + ->in('last_event', MessageEvent::getNotEndingEventsAsInts()) + ->lte('last_created', $olderThan->toDateTimeString())); + } +} diff --git a/src/QueueMonitorPlugin.php b/src/QueueMonitorPlugin.php new file mode 100644 index 0000000..3a98a02 --- /dev/null +++ b/src/QueueMonitorPlugin.php @@ -0,0 +1,69 @@ +add('queue_monitor purge', PurgeCommand::class) + ->add('queue_monitor notify', NotifyCommand::class); + } + + /** + * @inheritDoc + */ + public function services(ContainerInterface $container): void + { + $container->add(QueueMonitoringService::class); + $container + ->add(PurgeCommand::class) + ->addArguments([ + QueueMonitoringService::class, + ]); + $container + ->add(NotifyCommand::class) + ->addArguments([ + QueueMonitoringService::class, + ]); + } +} diff --git a/src/Service/QueueMonitoringService.php b/src/Service/QueueMonitoringService.php new file mode 100644 index 0000000..d345adc --- /dev/null +++ b/src/Service/QueueMonitoringService.php @@ -0,0 +1,110 @@ +QueueMonitoringLogsTable = $this->fetchTable(LogsTable::class); + } + + /** + * Get purge `to date` value + */ + public function getPurgeToDate(int $daysOld): DateTime + { + return DateTime::now('UTC') + ->subDays($daysOld) + ->endOfDay(); + } + + /** + * Purge old logs + */ + public function purgeLogs(int $daysOld): int + { + return $this->QueueMonitoringLogsTable->deleteAll( + fn (QueryExpression $queryExpression): QueryExpression => $queryExpression->lte( + $this->QueueMonitoringLogsTable->aliasField('message_timestamp'), + $this->getPurgeToDate($daysOld), + TableSchemaInterface::TYPE_DATETIME + ) + ); + } + + /** + * get the list of jobs that are have last event older than 30 minutes and event type is not finished + * in any way (seen, start) + * + * @throws \Exception + */ + public function notifyAboutLongRunningJobs(int $longJobsInMinutes): void + { + $olderThan = DateTime::now('UTC')->subMinutes($longJobsInMinutes); + + /** + * @uses \CakeDC\QueueMonitor\Model\Table\LogsTable::findStuckJobs() + */ + $runningJobs = $this->QueueMonitoringLogsTable + ->find(type: 'stuckJobs', olderThan: $olderThan) + ->all(); + + if ($runningJobs->count()) { + $notifyEmails = Configure::read('QueueMonitor.notificationRecipients'); + if (!$notifyEmails) { + throw new QueueMonitorException( + 'Missing `QueueMonitor.notificationRecipients` configuration' + ); + } + $notifyEmails = explode(',', $notifyEmails); + $mailerConfig = Configure::read('QueueMonitor.mailerConfig', 'default'); + $mailer = new Mailer($mailerConfig); + foreach ($notifyEmails as $notifyEmail) { + if (!Validation::email($notifyEmail)) { + throw new QueueMonitorException( + 'Invalid notification email in `QueueMonitor.notificationRecipients`' + ); + } + $mailer->addTo(trim($notifyEmail)); + } + $mailer->setSubject('Emergency. There are jobs stuck in queue.') + ->deliver('This is automated message about queue job stuck in queue engine.' . + " \n\nThere are {$runningJobs->count()} stuck in queue for the last $longJobsInMinutes " . + 'minutes and more.'); + } + } +} diff --git a/tests/TestCase/Listener/QueueMonitorListenerTest.php b/tests/TestCase/Listener/QueueMonitorListenerTest.php new file mode 100644 index 0000000..467a76c --- /dev/null +++ b/tests/TestCase/Listener/QueueMonitorListenerTest.php @@ -0,0 +1,163 @@ + ['processReturnAck', InteropProcessor::ACK, 'Message processed successfully', 'Processor.message.success'], + 'null' => ['processReturnNull', InteropProcessor::ACK, 'Message processed successfully', 'Processor.message.success'], + 'reject' => ['processReturnReject', InteropProcessor::REJECT, 'Message processed with rejection', 'Processor.message.reject'], + 'requeue' => ['processReturnRequeue', InteropProcessor::REQUEUE, 'Message processed with failure, requeuing', 'Processor.message.failure'], + 'string' => ['processReturnString', InteropProcessor::REQUEUE, 'Message processed with failure, requeuing', 'Processor.message.failure'], + ]; + } + + /** + * Data provider used by testHandleException + * + * @return array[] + */ + public static function dataProviderTestHandleException(): array + { + return [ + ['processAndThrowException'], + ['processAndThrowTypeError'], + ['processAndThrowError'], + ]; + } + + /** + * Test process method + * + * @param string $jobMethod The method name to run + * @param string $expected The expected process result. + * @param string $logMessage The log message based on process result. + * @param string $dispacthedEvent The dispatched event based on process result. + * @dataProvider dataProviderTestProcess + * @return void + */ + public function testProcess($jobMethod, $expected, $logMessage, $dispatchedEvent) + { + $messageBody = [ + 'class' => [TestProcessor::class, $jobMethod], + 'args' => [], + ]; + $connectionFactory = new NullConnectionFactory(); + $context = $connectionFactory->createContext(); + $queueMessage = new NullMessage(json_encode($messageBody)); + $message = new Message($queueMessage, $context); + + $events = new EventList(); + $logger = new ArrayLog(); + $processor = new Processor($logger); + $processor->getEventManager()->setEventList($events); + $processor->getEventManager()->on(new QueueMonitorListener()); + + $actual = $processor->process($queueMessage, $context); + $this->assertSame($expected, $actual); + + $logs = $logger->read(); + $this->assertCount(1, $logs); + $this->assertStringContainsString('debug', $logs[0]); + $this->assertStringContainsString($logMessage, $logs[0]); + + $this->assertSame(3, $events->count()); + $this->assertSame('Processor.message.seen', $events[0]->getName()); + $this->assertEquals(['queueMessage' => $queueMessage], $events[0]->getData()); + + // Events should contain a message with the same payload. + $this->assertSame('Processor.message.start', $events[1]->getName()); + $data = $events[1]->getData(); + $this->assertArrayHasKey('message', $data); + $this->assertSame($message->jsonSerialize(), $data['message']->jsonSerialize()); + + $this->assertSame($dispatchedEvent, $events[2]->getName()); + $data = $events[2]->getData(); + $this->assertArrayHasKey('message', $data); + $this->assertSame($message->jsonSerialize(), $data['message']->jsonSerialize()); + } + + /** + * When processMessage() throws an exception, test that + * requeue will return. + * + * @return void + * @dataProvider dataProviderTestHandleException + */ + public function testProcessWillRequeueOnException(string $method) + { + $messageBody = [ + 'class' => [TestProcessor::class, $method], + 'data' => ['sample_data' => 'a value', 'key' => md5($method)], + ]; + $connectionFactory = new NullConnectionFactory(); + $context = $connectionFactory->createContext(); + $queueMessage = new NullMessage(json_encode($messageBody)); + + $events = new EventList(); + $logger = new ArrayLog(); + $processor = new Processor($logger); + $processor->getEventManager()->setEventList($events); + $processor->getEventManager()->on(new QueueMonitorListener()); + + $result = $processor->process($queueMessage, $context); + $this->assertEquals(InteropProcessor::REQUEUE, $result); + } + + /** + * Test processMessage method. + * + * @return void + */ + public function testProcessMessage() + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $connectionFactory = new NullConnectionFactory(); + $context = $connectionFactory->createContext(); + $queueMessage = new NullMessage(json_encode($messageBody)); + $message = new Message($queueMessage, $context); + $processor = new Processor(); + $processor->getEventManager()->on(new QueueMonitorListener()); + + $result = $processor->processMessage($message); + $this->assertSame(InteropProcessor::ACK, $result); + $this->assertNotEmpty(TestProcessor::$lastProcessMessage); + } +} diff --git a/tests/TestCase/TestProcessor.php b/tests/TestCase/TestProcessor.php new file mode 100644 index 0000000..1ce8fbc --- /dev/null +++ b/tests/TestCase/TestProcessor.php @@ -0,0 +1,95 @@ + [ + 'engine' => 'File', + 'prefix' => 'cake_core_', + 'serialize' => true, + ], + '_cake_model_' => [ + 'engine' => 'File', + 'prefix' => 'cake_model_', + 'serialize' => true, + ], +]); + +Configure::write('debug', true); +Configure::write('App', [ + 'namespace' => 'TestApp', + 'encoding' => 'UTF-8', + 'paths' => [ + 'templates' => [ROOT . 'templates' . DS], + ], +]); + +Configure::write('Queue', [ + 'default' => [ + // Don't actually send messages anywhere. + 'url' => 'null:', + + // The queue that will be used for sending messages. default: default + // This can be overriden when queuing or processing messages + 'queue' => 'default', + + // The name of a configured logger, default: null + 'logger' => 'stdout', + ], +]); + +// Ensure default test connection is defined +if (!getenv('db_dsn')) { + putenv('db_dsn=sqlite:///:memory:'); +} + +ConnectionManager::setConfig('test', [ + 'url' => getenv('db_dsn'), + 'timezone' => 'UTC', +]); + +(new Migrator())->run(); diff --git a/tests/schema.sql b/tests/schema.sql new file mode 100644 index 0000000..08835b7 --- /dev/null +++ b/tests/schema.sql @@ -0,0 +1 @@ +-- Test database schema for QueueMonitorTemplate