From cdb48cdfa40983fb66d4f7f816533327c3d1fed7 Mon Sep 17 00:00:00 2001 From: Mateus Junges Date: Tue, 20 Feb 2024 10:56:43 -0300 Subject: [PATCH] Set `onStopConsuming` while building consumer and make `MessageConsumer` accessible to handler functions (#260) * Pass instance of message consumer to handler functions * Set onStopConsuming callback on builder instance instead of consumer * Improve docs * Improve docs * wip * Add note about message consumer not available for queued handlers * Fix docs * Update upgrade guide --- docs/6-upgrade-guide.md | 24 +++++++++++++++--- docs/advanced-usage/2-graceful-shutdown.md | 6 ++--- docs/advanced-usage/6-stopping-a-consumer.md | 25 +++++++++++++++++++ docs/consuming-messages/10-class-structure.md | 4 +-- .../11-queueable-handlers.md | 5 ++++ docs/consuming-messages/5-message-handlers.md | 6 ++--- .../9-handling-message-batch.md | 2 +- src/Concerns/HandleConsumedMessage.php | 5 ++-- src/Config/Config.php | 7 ++++++ src/Consumers/Builder.php | 10 ++++++++ src/Consumers/CallableBatchConsumer.php | 9 ++++--- src/Consumers/CallableConsumer.php | 9 ++++--- src/Consumers/Consumer.php | 18 ++++++------- src/Consumers/DispatchQueuedHandler.php | 7 +++++- src/Consumers/NullBatchConsumer.php | 5 ++-- src/Contracts/BatchMessageConsumer.php | 2 +- src/Contracts/Consumer.php | 2 +- src/Contracts/ConsumerBuilder.php | 3 +++ src/Contracts/MessageConsumer.php | 3 --- src/Support/Testing/Fakes/BuilderFake.php | 1 + src/Support/Testing/Fakes/ConsumerFake.php | 14 +++-------- tests/Consumers/CallableConsumerTest.php | 6 ++++- tests/Consumers/ConsumerTest.php | 13 +++++----- tests/Fakes/FakeHandler.php | 3 ++- tests/KafkaFakeTest.php | 16 +++++------- 25 files changed, 135 insertions(+), 70 deletions(-) create mode 100644 docs/advanced-usage/6-stopping-a-consumer.md diff --git a/docs/6-upgrade-guide.md b/docs/6-upgrade-guide.md index e94639fc..05e60e64 100644 --- a/docs/6-upgrade-guide.md +++ b/docs/6-upgrade-guide.md @@ -22,9 +22,27 @@ The `withSasl` method now accepts all `SASL` parameters instead of a `Sasl` obje public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT'); ``` +#### Setting `onStopConsuming` callbacks + +To set `onStopConsuming` callbacks you need to define them while building the consumer, instead of after calling the `build` method as in `v1.13.x`: + +```diff +$consumer = Kafka::consumer(['topic']) + ->withConsumerGroupId('group') + ->withHandler(new Handler) ++ ->onStopConsuming(static function () { ++ // Do something when the consumer stop consuming messages ++ }) + ->build() +- ->onStopConsuming(static function () { +- // Do something when the consumer stop consuming messages +- }) +``` + + ### Updating dependencies -**PHP 8.1 Required** +**PHP 8.2 Required** -This package now requires PHP 8.1 or higher. +This package now requires PHP 8.2 or higher. -You can use tools such as [rector](https://github.com/rectorphp/rector) to upgrade your app to PHP 8.1. +You can use tools such as [rector](https://github.com/rectorphp/rector) to upgrade your app to PHP 8.2. diff --git a/docs/advanced-usage/2-graceful-shutdown.md b/docs/advanced-usage/2-graceful-shutdown.md index 7921b428..b5f6b81c 100644 --- a/docs/advanced-usage/2-graceful-shutdown.md +++ b/docs/advanced-usage/2-graceful-shutdown.md @@ -5,7 +5,7 @@ weight: 2 Stopping consumers is very useful if you want to ensure you don't kill a process halfway through processing a consumed message. -Starting from version `1.12.x` of this package, consumers automatically listen to the `SIGTERM`, `SIGINT` and `SIQUIT` signals, which means you can easily stop your consumers using those signals. +Consumers automatically listen to the `SIGTERM`, `SIGINT` and `SIQUIT` signals, which means you can easily stop your consumers using those signals. ### Running callbacks when the consumer stops If your app requires that you run sum sort of processing when the consumers stop processing messages, you can use the `onStopConsume` method, available on the `\Junges\Kafka\Contracts\CanConsumeMessages` interface. This method accepts a `Closure` that will run once your consumer stops consuming. @@ -16,12 +16,12 @@ use Junges\Kafka\Facades\Kafka; $consumer = Kafka::consumer(['topic']) ->withConsumerGroupId('group') ->withHandler(new Handler) - ->build() ->onStopConsuming(static function () { // Do something when the consumer stop consuming messages }) + ->build(); $consumer->consume(); ``` -> You will require the [ Process Control Extension ](https://www.php.net/manual/en/book.pcntl.php) to be installed to utilise this feature. +> This features requires [ Process Control Extension ](https://www.php.net/manual/en/book.pcntl.php) to be installed. diff --git a/docs/advanced-usage/6-stopping-a-consumer.md b/docs/advanced-usage/6-stopping-a-consumer.md new file mode 100644 index 00000000..5e0a23ac --- /dev/null +++ b/docs/advanced-usage/6-stopping-a-consumer.md @@ -0,0 +1,25 @@ +--- +title: Stop consumer on demand +weight: 6 +--- + +Sometimes, you may want to stop your consumer based on a given message or any other condition. + +You can do it by adding a calling `stopConsuming()` method on the `MessageConsumer` instance that is passed as the +second argument of your message handler: + +```php +$consumer = \Junges\Kafka\Facades\Kafka::consumer(['topic']) + ->withConsumerGroupId('group') + ->stopAfterLastMessage() + ->withHandler(static function (\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) { + if ($someCondition) { + $consumer->stopConsuming(); + } + }) + ->build(); + +$consumer->consume(); +``` + +The `onStopConsuming` callback will be executed before stopping your consumer. \ No newline at end of file diff --git a/docs/consuming-messages/10-class-structure.md b/docs/consuming-messages/10-class-structure.md index 452d95d4..10d01292 100644 --- a/docs/consuming-messages/10-class-structure.md +++ b/docs/consuming-messages/10-class-structure.md @@ -11,7 +11,7 @@ Consumer classes are very simple, and it is basically a Laravel Command class. T namespace App\Console\Commands\Consumers; use Illuminate\Console\Command; -use Junges\Kafka\Facades\Kafka; +use Junges\Kafka\Contracts\MessageConsumer;use Junges\Kafka\Facades\Kafka; use Junges\Kafka\Contracts\ConsumerMessage; class MyTopicConsumer extends Command @@ -25,7 +25,7 @@ class MyTopicConsumer extends Command $consumer = Kafka::consumer(['my-topic']) ->withBrokers('localhost:8092') ->withAutoCommit() - ->withHandler(function(ConsumerMessage $message) { + ->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) { // Handle your message here }) ->build(); diff --git a/docs/consuming-messages/11-queueable-handlers.md b/docs/consuming-messages/11-queueable-handlers.md index 97f763b1..213b649c 100644 --- a/docs/consuming-messages/11-queueable-handlers.md +++ b/docs/consuming-messages/11-queueable-handlers.md @@ -23,4 +23,9 @@ class Handler implements HandlerContract, ShouldQueue } ``` +As you can see on the `__invoke` method, queued handlers does not have access to a `MessageConsumer` instance when handling the message, +because it's running on a laravel queue and there are no actions that can be performed asynchronously on Kafka message consumer. + After creating your handler class, you can use it just as a normal handler, and `laravel-kafka` will know how to handle it under the hoods 😄. + + diff --git a/docs/consuming-messages/5-message-handlers.md b/docs/consuming-messages/5-message-handlers.md index 0e100194..291635a4 100644 --- a/docs/consuming-messages/5-message-handlers.md +++ b/docs/consuming-messages/5-message-handlers.md @@ -10,7 +10,7 @@ You can use an invokable class or a simple callback. Use the `withHandler` metho $consumer = \Junges\Kafka\Facades\Kafka::consumer(); // Using callback: -$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message) { +$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) { // Handle your message here }); ``` @@ -20,7 +20,7 @@ Or, using an invokable class: ```php class Handler { - public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message){ + public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) { // Handle your message here } } @@ -28,7 +28,7 @@ class Handler $consumer = \Junges\Kafka\Facades\Kafka::consumer()->withHandler(new Handler) ``` -The `KafkaConsumerMessage` contract gives you some handy methods to get the message properties: +The `ConsumerMessage` contract gives you some handy methods to get the message properties: - `getKey()`: Returns the Kafka Message Key - `getTopicName()`: Returns the topic where the message was published diff --git a/docs/consuming-messages/9-handling-message-batch.md b/docs/consuming-messages/9-handling-message-batch.md index 30028f12..ac8bbd39 100644 --- a/docs/consuming-messages/9-handling-message-batch.md +++ b/docs/consuming-messages/9-handling-message-batch.md @@ -15,7 +15,7 @@ $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->enableBatching() ->withBatchSizeLimit(1000) ->withBatchReleaseInterval(1500) - ->withHandler(function (\Illuminate\Support\Collection $collection) { + ->withHandler(function (\Illuminate\Support\Collection $collection, \Junges\Kafka\Contracts\MessageConsumer $consumer) { // Handle batch }) ->build(); diff --git a/src/Concerns/HandleConsumedMessage.php b/src/Concerns/HandleConsumedMessage.php index 39426e56..c06475cd 100644 --- a/src/Concerns/HandleConsumedMessage.php +++ b/src/Concerns/HandleConsumedMessage.php @@ -5,6 +5,7 @@ use Closure; use Junges\Kafka\Contracts\ConsumerMessage; use Junges\Kafka\Contracts\Handler; +use Junges\Kafka\Contracts\MessageConsumer; /** * @internal @@ -12,7 +13,7 @@ */ trait HandleConsumedMessage { - private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler, array $middlewares = []): void + private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler, ?MessageConsumer $consumer = null, array $middlewares = []): void { $middlewares = array_map($this->wrapMiddleware(...), $middlewares); $middlewares = array_reverse($middlewares); @@ -21,6 +22,6 @@ private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler = $middleware($handler); } - $handler($message); + $handler($message, $consumer); } } diff --git a/src/Config/Config.php b/src/Config/Config.php index 08f7a527..88ad2686 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -2,6 +2,7 @@ namespace Junges\Kafka\Config; +use Closure; use JetBrains\PhpStorm\Pure; use Junges\Kafka\Contracts\Consumer; use Junges\Kafka\Contracts\HandlesBatchConfiguration; @@ -83,6 +84,7 @@ public function __construct( private readonly array $afterConsumingCallbacks = [], private readonly int $maxTime = 0, private readonly array $partitionAssignment = [], + private readonly ?Closure $whenStopConsuming = null, ) { } @@ -225,4 +227,9 @@ public function getPartitionAssigment(): array { return $this->partitionAssignment; } + + public function getWhenStopConsumingCallback(): ?Closure + { + return $this->whenStopConsuming; + } } diff --git a/src/Consumers/Builder.php b/src/Consumers/Builder.php index 0da8a326..8bfc1fa1 100644 --- a/src/Consumers/Builder.php +++ b/src/Consumers/Builder.php @@ -57,6 +57,8 @@ class Builder implements ConsumerBuilderContract /** @var array */ protected array $partitionAssignment = []; + protected ?Closure $onStopConsuming = null; + protected function __construct(protected string $brokers, array $topics = [], protected ?string $groupId = null) { if (count($topics) > 0) { @@ -314,6 +316,13 @@ public function assignPartitions(array $partitionAssignment): self return $this; } + public function onStopConsuming(callable $onStopConsuming): self + { + $this->onStopConsuming = $onStopConsuming(...); + + return $this; + } + /** @inheritDoc */ public function build(): MessageConsumer { @@ -337,6 +346,7 @@ public function build(): MessageConsumer afterConsumingCallbacks: $this->afterConsumingCallbacks, maxTime: $this->maxTime, partitionAssignment: $this->partitionAssignment, + whenStopConsuming: $this->onStopConsuming, ); return new Consumer($config, $this->deserializer, $this->committerFactory); diff --git a/src/Consumers/CallableBatchConsumer.php b/src/Consumers/CallableBatchConsumer.php index 2185bf19..d126d848 100644 --- a/src/Consumers/CallableBatchConsumer.php +++ b/src/Consumers/CallableBatchConsumer.php @@ -5,15 +5,16 @@ use Closure; use Illuminate\Support\Collection; use Junges\Kafka\Contracts\BatchMessageConsumer; +use Junges\Kafka\Contracts\MessageConsumer; -class CallableBatchConsumer implements BatchMessageConsumer +readonly class CallableBatchConsumer implements BatchMessageConsumer { - public function __construct(private readonly Closure $batchHandler) + public function __construct(private Closure $batchHandler) { } - public function handle(Collection $collection): void + public function handle(Collection $collection, MessageConsumer $consumer): void { - ($this->batchHandler)($collection); + ($this->batchHandler)($collection, $consumer); } } diff --git a/src/Consumers/CallableConsumer.php b/src/Consumers/CallableConsumer.php index 05fee499..357c8d57 100644 --- a/src/Consumers/CallableConsumer.php +++ b/src/Consumers/CallableConsumer.php @@ -11,6 +11,7 @@ use Junges\Kafka\Contracts\Consumer; use Junges\Kafka\Contracts\ConsumerMessage; use Junges\Kafka\Contracts\Handler; +use Junges\Kafka\Contracts\MessageConsumer; class CallableConsumer extends Consumer { @@ -29,7 +30,7 @@ public function __construct(private Closure|Handler $handler, private readonly a } /** Handle the received message. */ - public function handle(ConsumerMessage $message): void + public function handle(ConsumerMessage $message, MessageConsumer $consumer): void { // If the message handler should be queued, we will dispatch a job to handle this message. // Otherwise, the message will be handled synchronously. @@ -39,7 +40,7 @@ public function handle(ConsumerMessage $message): void return; } - $this->handleMessageSynchronously($message); + $this->handleMessageSynchronously($message, $consumer); } private function shouldQueueHandler(): bool @@ -47,9 +48,9 @@ private function shouldQueueHandler(): bool return $this->handler instanceof ShouldQueue; } - private function handleMessageSynchronously(ConsumerMessage $message): void + private function handleMessageSynchronously(ConsumerMessage $message, MessageConsumer $consumer): void { - $this->handleConsumedMessage($message, $this->handler, $this->middlewares); + $this->handleConsumedMessage($message, $this->handler, $consumer, $this->middlewares); } /** diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php index 6ee9edf6..09aa4b81 100644 --- a/src/Consumers/Consumer.php +++ b/src/Consumers/Consumer.php @@ -61,7 +61,7 @@ class Consumer implements MessageConsumer private readonly Retryable $retryable; private readonly CommitterFactory $committerFactory; private bool $stopRequested = false; - private ?Closure $whenStopConsuming = null; + private ?Closure $whenStopConsuming; protected int $lastRestart = 0; protected Timer $restartTimer; private Dispatcher $dispatcher; @@ -74,6 +74,7 @@ public function __construct(private readonly Config $config, private readonly Me $this->committerFactory = $committerFactory ?? new DefaultCommitterFactory($this->messageCounter); $this->dispatcher = App::make(Dispatcher::class); + $this->whenStopConsuming = $this->config->getWhenStopConsumingCallback(); } /** @@ -170,14 +171,6 @@ public function stopConsuming(): void $this->stopRequested = true; } - /** @inheritdoc */ - public function onStopConsuming(?Closure $onStopConsuming = null): self - { - $this->whenStopConsuming = $onStopConsuming; - - return $this; - } - /** Will cancel the stopConsume request initiated by calling the stopConsume method */ public function cancelStopConsume(): void { @@ -232,7 +225,10 @@ private function executeMessage(Message $message): void // was received and will be consumed as soon as a consumer is available to process it. $this->dispatcher->dispatch(new StartedConsumingMessage($consumedMessage)); - $this->config->getConsumer()->handle($consumedMessage = $this->deserializer->deserialize($consumedMessage)); + $this->config->getConsumer()->handle( + $consumedMessage = $this->deserializer->deserialize($consumedMessage), + $this + ); $success = true; // Dispatch an event informing that a message was consumed. @@ -283,7 +279,7 @@ private function executeBatch(Collection $collection): void $consumedMessages = $collection ->map(fn (Message $message) => $this->deserializer->deserialize($this->getConsumerMessage($message))); - $this->config->getBatchConfig()->getConsumer()->handle($consumedMessages); + $this->config->getBatchConfig()->getConsumer()->handle($consumedMessages, $this); $collection->each(fn (Message $message) => $this->commit($message, true)); } catch (Throwable $throwable) { diff --git a/src/Consumers/DispatchQueuedHandler.php b/src/Consumers/DispatchQueuedHandler.php index 7de0543b..ba635cbc 100644 --- a/src/Consumers/DispatchQueuedHandler.php +++ b/src/Consumers/DispatchQueuedHandler.php @@ -28,6 +28,11 @@ public function __construct( public function handle(): void { - $this->handleConsumedMessage($this->message, $this->handler, $this->middlewares); + // Queued handlers does not have access to an instance of the MessageConsumer class. + $this->handleConsumedMessage( + message: $this->message, + handler: $this->handler, + middlewares: $this->middlewares + ); } } diff --git a/src/Consumers/NullBatchConsumer.php b/src/Consumers/NullBatchConsumer.php index e4dddb6e..9cba29ed 100644 --- a/src/Consumers/NullBatchConsumer.php +++ b/src/Consumers/NullBatchConsumer.php @@ -4,10 +4,11 @@ use Illuminate\Support\Collection; use Junges\Kafka\Contracts\BatchMessageConsumer; +use Junges\Kafka\Contracts\MessageConsumer; -class NullBatchConsumer implements BatchMessageConsumer +readonly class NullBatchConsumer implements BatchMessageConsumer { - public function handle(Collection $collection): void + public function handle(Collection $collection, MessageConsumer $consumer): void { } } diff --git a/src/Contracts/BatchMessageConsumer.php b/src/Contracts/BatchMessageConsumer.php index 7c4759e0..3ac124b5 100644 --- a/src/Contracts/BatchMessageConsumer.php +++ b/src/Contracts/BatchMessageConsumer.php @@ -7,5 +7,5 @@ interface BatchMessageConsumer { /** Handles messages released from batch repository. */ - public function handle(Collection $collection): void; + public function handle(Collection $collection, MessageConsumer $consumer): void; } diff --git a/src/Contracts/Consumer.php b/src/Contracts/Consumer.php index fa167aa7..a4db7e3c 100644 --- a/src/Contracts/Consumer.php +++ b/src/Contracts/Consumer.php @@ -7,7 +7,7 @@ abstract class Consumer { - abstract public function handle(ConsumerMessage $message): void; + abstract public function handle(ConsumerMessage $message, MessageConsumer $consumer): void; /** @throws Throwable */ public function failed(string $message, string $topic, Throwable $exception): never diff --git a/src/Contracts/ConsumerBuilder.php b/src/Contracts/ConsumerBuilder.php index 13562582..9f8df785 100644 --- a/src/Contracts/ConsumerBuilder.php +++ b/src/Contracts/ConsumerBuilder.php @@ -16,6 +16,9 @@ public function subscribe(...$topics): self; /** Assigns a set of partitions this consumer should consume from. */ public function assignPartitions(array $partitionAssignment): self; + /** Defines a callback to be executed when consumer stops consuming messages. */ + public function onStopConsuming(callable $onStopConsuming): self; + /** Set the brokers the kafka consumer should use. */ public function withBrokers(?string $brokers): self; diff --git a/src/Contracts/MessageConsumer.php b/src/Contracts/MessageConsumer.php index 714331ee..1cf4abab 100644 --- a/src/Contracts/MessageConsumer.php +++ b/src/Contracts/MessageConsumer.php @@ -21,7 +21,4 @@ public function cancelStopConsume(): void; /** Count the number of messages consumed by this consumer */ public function consumedMessagesCount(): int; - - /** Defines a callable that will run when the consumer stops consuming messages. */ - public function onStopConsuming(?Closure $onStopConsuming = null): self; } diff --git a/src/Support/Testing/Fakes/BuilderFake.php b/src/Support/Testing/Fakes/BuilderFake.php index f54bfb46..7fa386a7 100644 --- a/src/Support/Testing/Fakes/BuilderFake.php +++ b/src/Support/Testing/Fakes/BuilderFake.php @@ -56,6 +56,7 @@ public function build(): MessageConsumer batchConfig: $this->getBatchConfig(), stopAfterLastMessage: $this->stopAfterLastMessage, callbacks: $this->callbacks, + whenStopConsuming: $this->onStopConsuming, ); return new ConsumerFake( diff --git a/src/Support/Testing/Fakes/ConsumerFake.php b/src/Support/Testing/Fakes/ConsumerFake.php index b094b476..8f047c8f 100644 --- a/src/Support/Testing/Fakes/ConsumerFake.php +++ b/src/Support/Testing/Fakes/ConsumerFake.php @@ -26,6 +26,7 @@ public function __construct( ) { $this->messageCounter = new MessageCounter($config->getMaxMessages()); $this->batchConfig = $this->config->getBatchConfig(); + $this->whenStopConsuming = $this->config->getWhenStopConsumingCallback(); } /** Consume messages from a kafka topic in loop. */ @@ -85,15 +86,6 @@ private function shouldStopConsuming(): bool return $this->maxMessagesLimitReached() || $this->stopRequested; } - /** Consume messages */ - /** @inheritdoc */ - public function onStopConsuming(?Closure $onStopConsuming = null): self - { - $this->whenStopConsuming = $onStopConsuming; - - return $this; - } - /** * Consume messages * @@ -153,13 +145,13 @@ private function executeBatch(Collection $collection): void fn (Message $message) => $this->getConsumerMessage($message) ); - $this->config->getBatchConfig()->getConsumer()->handle($consumedMessages); + $this->config->getBatchConfig()->getConsumer()->handle($consumedMessages, $this); } /** Handle the message. */ private function handleMessage(ConsumerMessage $message): void { - $this->config->getConsumer()->handle($message); + $this->config->getConsumer()->handle($message, $this); $this->messageCounter->add(); } diff --git a/tests/Consumers/CallableConsumerTest.php b/tests/Consumers/CallableConsumerTest.php index b8aa7042..b7a8ac5c 100644 --- a/tests/Consumers/CallableConsumerTest.php +++ b/tests/Consumers/CallableConsumerTest.php @@ -5,7 +5,9 @@ use Illuminate\Support\Str; use Junges\Kafka\Consumers\CallableConsumer; use Junges\Kafka\Contracts\ConsumerMessage; +use Junges\Kafka\Contracts\MessageConsumer; use Junges\Kafka\Tests\LaravelKafkaTestCase; +use Mockery as m; use RdKafka\Message; use stdClass; @@ -24,6 +26,8 @@ public function testItDecodesMessages(): void $message->headers = []; $message->offset = 0; + $messageConsumerMock = m::mock(MessageConsumer::class); + $consumer = new CallableConsumer($this->handleMessage(...), [ function (ConsumerMessage $message, callable $next): void { $decoded = json_decode($message->getBody()); @@ -35,7 +39,7 @@ function (stdClass $message, callable $next): void { }, ]); - $consumer->handle($this->getConsumerMessage($message)); + $consumer->handle($this->getConsumerMessage($message), $messageConsumerMock); } public function handleMessage(array $data): void diff --git a/tests/Consumers/ConsumerTest.php b/tests/Consumers/ConsumerTest.php index eb3bdfd8..9db40950 100644 --- a/tests/Consumers/ConsumerTest.php +++ b/tests/Consumers/ConsumerTest.php @@ -177,12 +177,13 @@ public function testCanStopConsume(): void $this->mockProducer(); $this->stoppableConsumer = Kafka::consumer(['test']) - ->withHandler(function (ConsumerMessage $message) { - if ($message->getKey() === 'key2' && $this->stoppableConsumer) { - $this->stoppableConsumer->onStopConsuming(function () { - $this->stoppableConsumerStopped = true; - $this->stoppedConsumerMessage = 'Consumer stopped.'; - })->stopConsuming(); + ->onStopConsuming(function () { + $this->stoppableConsumerStopped = true; + $this->stoppedConsumerMessage = 'Consumer stopped.'; + }) + ->withHandler(function (ConsumerMessage $message, MessageConsumer $consumer) { + if ($message->getKey() === 'key2') { + $consumer->stopConsuming(); } }) ->withAutoCommit() diff --git a/tests/Fakes/FakeHandler.php b/tests/Fakes/FakeHandler.php index 4861dfca..76665cd9 100644 --- a/tests/Fakes/FakeHandler.php +++ b/tests/Fakes/FakeHandler.php @@ -4,6 +4,7 @@ use Junges\Kafka\Contracts\Consumer; use Junges\Kafka\Contracts\ConsumerMessage; +use Junges\Kafka\Contracts\MessageConsumer; final class FakeHandler extends Consumer { @@ -14,7 +15,7 @@ public function lastMessage(): ?ConsumerMessage return $this->lastMessage; } - public function handle(ConsumerMessage $message): void + public function handle(ConsumerMessage $message, MessageConsumer $consumer): void { $this->lastMessage = $message; } diff --git a/tests/KafkaFakeTest.php b/tests/KafkaFakeTest.php index 0cce7142..583a4e20 100644 --- a/tests/KafkaFakeTest.php +++ b/tests/KafkaFakeTest.php @@ -386,14 +386,15 @@ public function testStopFakeConsumer(): void //stop consumer after first message $this->consumer->stopConsuming(); }) - ->build() ->onStopConsuming(function () use (&$stopped) { $stopped = true; - }); + }) + ->build(); $this->consumer->consume(); + //testing stop callback - $this->assertTrue((bool)$stopped); + $this->assertTrue($stopped); //should have consumed only one message $this->assertEquals(1, $this->consumer->consumedMessagesCount()); } @@ -560,22 +561,17 @@ public function testStopFakeBatchConsumer(): void Kafka::shouldReceiveMessages($messages); - $stopped = false; $this->consumer = Kafka::consumer(['test-topic']) ->enableBatching() ->withBatchSizeLimit(2) - ->withHandler(function (Collection $messages) use (&$stopped) { + ->withHandler(function (Collection $messages, MessageConsumer $consumer) { //stop consumer after first batch - $this->consumer->onStopConsuming(function () use (&$stopped) { - $stopped = true; - })->stopConsuming(); + $consumer->stopConsuming(); }) ->build(); $this->consumer->consume(); - //testing stop callback - $this->assertTrue((bool)$stopped); //should have consumed only two messages $this->assertEquals(2, $this->consumer->consumedMessagesCount()); }