diff --git a/CHANGELOG.md b/CHANGELOG.md index 65647c14..49f6ae74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,13 +2,19 @@ All relevant changes to `mateusjunges/laravel-kafka` will be documented here. -## [2023-10-xx v2.0.0](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.x...v2.0.0) +## [2024-01-xx v2.0.0](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.x...v2.0.0) - Add the ability to queue message handlers by @mateusjunges in [#177](https://github.com/mateusjunges/laravel-kafka/pull/177) - Add after and before consuming callbacks by @mateusjunges and @ebrahimradi in [#192](https://github.com/mateusjunges/laravel-kafka/pull/192) - Add simple events to kafka producer and consumers by @mateusjunges in [#193](https://github.com/mateusjunges/laravel-kafka/pull/193) - Append throwable/exception info when sending a message to DLQ by @mateusjunges in [#194](https://github.com/mateusjunges/laravel-kafka/pull/194) - Add the ability to assign partitions to a consumer by @mateusjunges in [#234](https://github.com/mateusjunges/laravel-kafka/pull/234) +## [2024-01-09 v1.13.6](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.6...v1.13.7) +* Add timeout feature for consumers by @mihaileu in [#233](https://github.com/mateusjunges/laravel-kafka/pull/233) + +## [2023-12-07 v1.13.6](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.5...v1.13.6) +* Remove internal annotation from interface by @mateusjunges + ## [2023-10-24 v1.13.5](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.4...v1.13.5) * Fixed default securityProtocol config by @SergkeiM in [#215](https://github.com/mateusjunges/laravel-kafka/pull/215) diff --git a/docs/consuming-messages/6-configuring-consumer-options.md b/docs/consuming-messages/6-configuring-consumer-options.md index 8c6e388e..0640d83a 100644 --- a/docs/consuming-messages/6-configuring-consumer-options.md +++ b/docs/consuming-messages/6-configuring-consumer-options.md @@ -41,6 +41,14 @@ kafka consumer: $consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxMessages(2); ``` +### Configuring the max time when a consumer can process messages +If you want to consume a limited amount of time, you can use the `withMaxTime` method to set the max number of seconds for +kafka consumer to process messages: + +```php +$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxTime(3600); +``` + ### Setting Kafka configuration options To set configuration options, you can use two methods: `withOptions`, passing an array of option and option value or, using the `withOption method and passing two arguments, the option name and the option value. diff --git a/src/Config/Config.php b/src/Config/Config.php index 563b50f1..08f7a527 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -81,6 +81,7 @@ public function __construct( private readonly array $callbacks = [], private readonly array $beforeConsumingCallbacks = [], private readonly array $afterConsumingCallbacks = [], + private readonly int $maxTime = 0, private readonly array $partitionAssignment = [], ) { } @@ -115,6 +116,11 @@ public function getMaxMessages(): int return $this->maxMessages; } + public function getMaxTime(): int + { + return $this->maxTime; + } + public function isAutoCommit(): bool { return $this->autoCommit; diff --git a/src/Console/Commands/ConsumerCommand.php b/src/Console/Commands/ConsumerCommand.php index 8ab5cdfb..0991e2bd 100644 --- a/src/Console/Commands/ConsumerCommand.php +++ b/src/Console/Commands/ConsumerCommand.php @@ -19,6 +19,7 @@ class ConsumerCommand extends Command {--commit=1} {--dlq=? : The Dead Letter Queue} {--maxMessage=? : The max number of messages that should be handled} + {--maxTime=0 : The max number of seconds that a consumer should run } {--securityProtocol=?}'; /* @var string $description */ @@ -56,7 +57,7 @@ public function handle() return; } - $parsedOptions = array_map(fn ($value) => $value === '?' ? null : $value, $this->options()); + $parsedOptions = array_map($this->parseOptions(...), $this->options()); $options = new Options($parsedOptions, $this->config); @@ -72,7 +73,8 @@ public function handle() consumer: app($consumer), sasl: $options->getSasl(), dlq: $options->getDlq(), - maxMessages: $options->getMaxMessages() + maxMessages: $options->getMaxMessages(), + maxTime: $options->getMaxTime(), ); /** @var Consumer $consumer */ @@ -83,4 +85,17 @@ public function handle() $consumer->consume(); } + + private function parseOptions(int|string|null $option): int|string|null + { + if ($option === '?') { + return null; + } + + if (is_numeric($option)) { + return (int) $option; + } + + return $option; + } } diff --git a/src/Console/Commands/KafkaConsumer/Options.php b/src/Console/Commands/KafkaConsumer/Options.php index c3a2c413..fb676793 100644 --- a/src/Console/Commands/KafkaConsumer/Options.php +++ b/src/Console/Commands/KafkaConsumer/Options.php @@ -14,6 +14,7 @@ final class Options private int|string|null $commit = 1; private ?string $dlq = null; private int $maxMessages = -1; + private int $maxTime = 0; private ?string $securityProtocol = 'plaintext'; private readonly ?string $saslUsername; private readonly ?string $saslPassword; @@ -68,6 +69,11 @@ public function getMaxMessages(): int return $this->maxMessages >= 1 ? $this->maxMessages : -1; } + public function getMaxTime(): int + { + return $this->maxTime; + } + #[Pure] public function getSasl(): ?Sasl { diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php index acd6454d..1aa67ef6 100644 --- a/src/Consumers/Consumer.php +++ b/src/Consumers/Consumer.php @@ -21,6 +21,7 @@ use Junges\Kafka\Exceptions\ConsumerException; use Junges\Kafka\MessageCounter; use Junges\Kafka\Retryable; +use Junges\Kafka\Support\InfiniteTimer; use Junges\Kafka\Support\Timer; use RdKafka\Conf; use RdKafka\KafkaConsumer; @@ -82,6 +83,7 @@ public function consume(): void { $this->cancelStopConsume(); $this->configureRestartTimer(); + $stopTimer = $this->configureStopTimer(); if ($this->supportAsyncSignals()) { $this->listenForSignals(); @@ -117,7 +119,7 @@ public function consume(): void $this->retryable->retry(fn () => $this->doConsume()); $this->runAfterConsumingCallbacks(); $this->checkForRestart(); - } while (! $this->maxMessagesLimitReached() && ! $this->stopRequested); + } while (! $this->maxMessagesLimitReached() && ! $stopTimer->isTimedOut() && ! $this->stopRequested); if ($this->shouldRunStopConsumingCallback()) { $callback = $this->whenStopConsuming; @@ -367,6 +369,18 @@ private function maxMessagesLimitReached(): bool return $this->messageCounter->maxMessagesLimitReached(); } + public function configureStopTimer(): Timer + { + $stopTimer = new Timer(); + if ($this->config->getMaxTime() === 0) { + $stopTimer = new InfiniteTimer(); + } + + $stopTimer->start($this->config->getMaxTime() * 1000); + + return $stopTimer; + } + /** * Handle the message. * diff --git a/src/Consumers/ConsumerBuilder.php b/src/Consumers/ConsumerBuilder.php index 6083da98..0e0adfd9 100644 --- a/src/Consumers/ConsumerBuilder.php +++ b/src/Consumers/ConsumerBuilder.php @@ -29,6 +29,7 @@ class ConsumerBuilder implements ConsumerBuilderContract protected int $commit; protected Closure | Handler $handler; protected int $maxMessages; + protected int $maxTime = 0; protected int $maxCommitRetries; /** @var list */ @@ -163,6 +164,16 @@ public function withMaxMessages(int $maxMessages): self return $this; } + /** + * @inheritDoc + */ + public function withMaxTime(int $maxTime): self + { + $this->maxTime = $maxTime; + + return $this; + } + /** @inheritDoc */ public function withMaxCommitRetries(int $maxCommitRetries): self { @@ -317,6 +328,7 @@ public function build(): MessageConsumer callbacks: $this->callbacks, beforeConsumingCallbacks: $this->beforeConsumingCallbacks, afterConsumingCallbacks: $this->afterConsumingCallbacks, + maxTime: $this->maxTime, partitionAssignment: $this->partitionAssignment, ); diff --git a/src/Contracts/ConsumerBuilder.php b/src/Contracts/ConsumerBuilder.php index 98644c08..d9a0e941 100644 --- a/src/Contracts/ConsumerBuilder.php +++ b/src/Contracts/ConsumerBuilder.php @@ -35,6 +35,20 @@ public function usingCommitterFactory(CommitterFactory $committerFactory): self; public function withMaxMessages(int $maxMessages): self; /** Specify the max retries attempts. */ + /** + * Define the max number seconds that a consumer should run + * + * @param int $maxTime + * @return \Junges\Kafka\Consumers\ConsumerBuilder + */ + public function withMaxTime(int $maxTime): self; + + /** + * Specify the max retries attempts. + * + * @param int $maxCommitRetries + * @return \Junges\Kafka\Consumers\ConsumerBuilder + */ public function withMaxCommitRetries(int $maxCommitRetries): self; /** diff --git a/src/Support/InfiniteTimer.php b/src/Support/InfiniteTimer.php new file mode 100644 index 00000000..253d470f --- /dev/null +++ b/src/Support/InfiniteTimer.php @@ -0,0 +1,11 @@ +assertEquals(1, $consumer->consumedMessagesCount()); } + public function testCanStopConsumeIfMaxTimeReached() + { + $message = new Message(); + $message->err = 0; + $message->key = 'key'; + $message->topic_name = 'test'; + $message->payload = '{"body": "message payload"}'; + $message->offset = 0; + $message->partition = 1; + $message->headers = []; + + $message2 = new Message(); + $message2->err = 0; + $message2->key = 'key2'; + $message2->topic_name = 'test2'; + $message2->payload = '{"body": "message payload2"}'; + $message2->offset = 0; + $message2->partition = 1; + $message2->headers = []; + + $this->mockConsumerWithMessage($message, $message2); + $this->mockProducer(); + + $fakeHandler = new CallableConsumer( + function (ConsumerMessage $message) { + sleep(2); + }, + [] + ); + + $config = new Config( + broker: 'broker', + topics: ['test-topic'], + securityProtocol: 'security', + commit: 1, + groupId: 'group', + consumer: $fakeHandler, + sasl: null, + dlq: null, + maxMessages: 2, + maxTime: 1, + ); + + $consumer = new Consumer($config, new JsonDeserializer()); + $consumer->consume(); + + //finally only one message should be consumed + $this->assertEquals(1, $consumer->consumedMessagesCount()); + } + public function testItRunCallbacksBeforeConsume(): void { $fakeHandler = new FakeHandler();