Skip to content

Commit

Permalink
Set onStopConsuming while building consumer and make `MessageConsum…
Browse files Browse the repository at this point in the history
…er` accessible to handler functions (#260)

* Pass instance of message consumer to handler functions

* Set onStopConsuming callback on builder instance instead of consumer

* Improve docs

* Improve docs

* wip

* Add note about message consumer not available for queued handlers

* Fix docs

* Update upgrade guide
  • Loading branch information
mateusjunges authored Feb 20, 2024
1 parent 5d3bc64 commit cdb48cd
Show file tree
Hide file tree
Showing 25 changed files with 135 additions and 70 deletions.
24 changes: 21 additions & 3 deletions docs/6-upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,27 @@ The `withSasl` method now accepts all `SASL` parameters instead of a `Sasl` obje
public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT');
```

#### Setting `onStopConsuming` callbacks

To set `onStopConsuming` callbacks you need to define them while building the consumer, instead of after calling the `build` method as in `v1.13.x`:

```diff
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
+ ->onStopConsuming(static function () {
+ // Do something when the consumer stop consuming messages
+ })
->build()
- ->onStopConsuming(static function () {
- // Do something when the consumer stop consuming messages
- })
```


### Updating dependencies
**PHP 8.1 Required**
**PHP 8.2 Required**

This package now requires PHP 8.1 or higher.
This package now requires PHP 8.2 or higher.

You can use tools such as [rector](https://github.com/rectorphp/rector) to upgrade your app to PHP 8.1.
You can use tools such as [rector](https://github.com/rectorphp/rector) to upgrade your app to PHP 8.2.
6 changes: 3 additions & 3 deletions docs/advanced-usage/2-graceful-shutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ weight: 2

Stopping consumers is very useful if you want to ensure you don't kill a process halfway through processing a consumed message.

Starting from version `1.12.x` of this package, consumers automatically listen to the `SIGTERM`, `SIGINT` and `SIQUIT` signals, which means you can easily stop your consumers using those signals.
Consumers automatically listen to the `SIGTERM`, `SIGINT` and `SIQUIT` signals, which means you can easily stop your consumers using those signals.

### Running callbacks when the consumer stops
If your app requires that you run sum sort of processing when the consumers stop processing messages, you can use the `onStopConsume` method, available on the `\Junges\Kafka\Contracts\CanConsumeMessages` interface. This method accepts a `Closure` that will run once your consumer stops consuming.
Expand All @@ -16,12 +16,12 @@ use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
->build()
->onStopConsuming(static function () {
// Do something when the consumer stop consuming messages
})
->build();

$consumer->consume();
```

> You will require the [ Process Control Extension ](https://www.php.net/manual/en/book.pcntl.php) to be installed to utilise this feature.
> This features requires [ Process Control Extension ](https://www.php.net/manual/en/book.pcntl.php) to be installed.
25 changes: 25 additions & 0 deletions docs/advanced-usage/6-stopping-a-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
title: Stop consumer on demand
weight: 6
---

Sometimes, you may want to stop your consumer based on a given message or any other condition.

You can do it by adding a calling `stopConsuming()` method on the `MessageConsumer` instance that is passed as the
second argument of your message handler:

```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->stopAfterLastMessage()
->withHandler(static function (\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
if ($someCondition) {
$consumer->stopConsuming();
}
})
->build();

$consumer->consume();
```

The `onStopConsuming` callback will be executed before stopping your consumer.
4 changes: 2 additions & 2 deletions docs/consuming-messages/10-class-structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Consumer classes are very simple, and it is basically a Laravel Command class. T
namespace App\Console\Commands\Consumers;

use Illuminate\Console\Command;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Contracts\MessageConsumer;use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Contracts\ConsumerMessage;

class MyTopicConsumer extends Command
Expand All @@ -25,7 +25,7 @@ class MyTopicConsumer extends Command
$consumer = Kafka::consumer(['my-topic'])
->withBrokers('localhost:8092')
->withAutoCommit()
->withHandler(function(ConsumerMessage $message) {
->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) {
// Handle your message here
})
->build();
Expand Down
5 changes: 5 additions & 0 deletions docs/consuming-messages/11-queueable-handlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ class Handler implements HandlerContract, ShouldQueue
}
```

As you can see on the `__invoke` method, queued handlers does not have access to a `MessageConsumer` instance when handling the message,
because it's running on a laravel queue and there are no actions that can be performed asynchronously on Kafka message consumer.

After creating your handler class, you can use it just as a normal handler, and `laravel-kafka` will know how to handle it under the hoods 😄.


6 changes: 3 additions & 3 deletions docs/consuming-messages/5-message-handlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ You can use an invokable class or a simple callback. Use the `withHandler` metho
$consumer = \Junges\Kafka\Facades\Kafka::consumer();

// Using callback:
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message) {
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle your message here
});
```
Expand All @@ -20,15 +20,15 @@ Or, using an invokable class:
```php
class Handler
{
public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message){
public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle your message here
}
}

$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withHandler(new Handler)
```

The `KafkaConsumerMessage` contract gives you some handy methods to get the message properties:
The `ConsumerMessage` contract gives you some handy methods to get the message properties:

- `getKey()`: Returns the Kafka Message Key
- `getTopicName()`: Returns the topic where the message was published
Expand Down
2 changes: 1 addition & 1 deletion docs/consuming-messages/9-handling-message-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ $consumer = \Junges\Kafka\Facades\Kafka::consumer()
->enableBatching()
->withBatchSizeLimit(1000)
->withBatchReleaseInterval(1500)
->withHandler(function (\Illuminate\Support\Collection $collection) {
->withHandler(function (\Illuminate\Support\Collection $collection, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle batch
})
->build();
Expand Down
5 changes: 3 additions & 2 deletions src/Concerns/HandleConsumedMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
use Closure;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\Handler;
use Junges\Kafka\Contracts\MessageConsumer;

/**
* @internal
* @mixin \Junges\Kafka\Concerns\PrepareMiddlewares
*/
trait HandleConsumedMessage
{
private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler, array $middlewares = []): void
private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler, ?MessageConsumer $consumer = null, array $middlewares = []): void
{
$middlewares = array_map($this->wrapMiddleware(...), $middlewares);
$middlewares = array_reverse($middlewares);
Expand All @@ -21,6 +22,6 @@ private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure
$handler = $middleware($handler);
}

$handler($message);
$handler($message, $consumer);
}
}
7 changes: 7 additions & 0 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Junges\Kafka\Config;

use Closure;
use JetBrains\PhpStorm\Pure;
use Junges\Kafka\Contracts\Consumer;
use Junges\Kafka\Contracts\HandlesBatchConfiguration;
Expand Down Expand Up @@ -83,6 +84,7 @@ public function __construct(
private readonly array $afterConsumingCallbacks = [],
private readonly int $maxTime = 0,
private readonly array $partitionAssignment = [],
private readonly ?Closure $whenStopConsuming = null,
) {
}

Expand Down Expand Up @@ -225,4 +227,9 @@ public function getPartitionAssigment(): array
{
return $this->partitionAssignment;
}

public function getWhenStopConsumingCallback(): ?Closure
{
return $this->whenStopConsuming;
}
}
10 changes: 10 additions & 0 deletions src/Consumers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class Builder implements ConsumerBuilderContract
/** @var array<int, TopicPartition> */
protected array $partitionAssignment = [];

protected ?Closure $onStopConsuming = null;

protected function __construct(protected string $brokers, array $topics = [], protected ?string $groupId = null)
{
if (count($topics) > 0) {
Expand Down Expand Up @@ -314,6 +316,13 @@ public function assignPartitions(array $partitionAssignment): self
return $this;
}

public function onStopConsuming(callable $onStopConsuming): self
{
$this->onStopConsuming = $onStopConsuming(...);

return $this;
}

/** @inheritDoc */
public function build(): MessageConsumer
{
Expand All @@ -337,6 +346,7 @@ public function build(): MessageConsumer
afterConsumingCallbacks: $this->afterConsumingCallbacks,
maxTime: $this->maxTime,
partitionAssignment: $this->partitionAssignment,
whenStopConsuming: $this->onStopConsuming,
);

return new Consumer($config, $this->deserializer, $this->committerFactory);
Expand Down
9 changes: 5 additions & 4 deletions src/Consumers/CallableBatchConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
use Closure;
use Illuminate\Support\Collection;
use Junges\Kafka\Contracts\BatchMessageConsumer;
use Junges\Kafka\Contracts\MessageConsumer;

class CallableBatchConsumer implements BatchMessageConsumer
readonly class CallableBatchConsumer implements BatchMessageConsumer
{
public function __construct(private readonly Closure $batchHandler)
public function __construct(private Closure $batchHandler)
{
}

public function handle(Collection $collection): void
public function handle(Collection $collection, MessageConsumer $consumer): void
{
($this->batchHandler)($collection);
($this->batchHandler)($collection, $consumer);
}
}
9 changes: 5 additions & 4 deletions src/Consumers/CallableConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Junges\Kafka\Contracts\Consumer;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\Handler;
use Junges\Kafka\Contracts\MessageConsumer;

class CallableConsumer extends Consumer
{
Expand All @@ -29,7 +30,7 @@ public function __construct(private Closure|Handler $handler, private readonly a
}

/** Handle the received message. */
public function handle(ConsumerMessage $message): void
public function handle(ConsumerMessage $message, MessageConsumer $consumer): void
{
// If the message handler should be queued, we will dispatch a job to handle this message.
// Otherwise, the message will be handled synchronously.
Expand All @@ -39,17 +40,17 @@ public function handle(ConsumerMessage $message): void
return;
}

$this->handleMessageSynchronously($message);
$this->handleMessageSynchronously($message, $consumer);
}

private function shouldQueueHandler(): bool
{
return $this->handler instanceof ShouldQueue;
}

private function handleMessageSynchronously(ConsumerMessage $message): void
private function handleMessageSynchronously(ConsumerMessage $message, MessageConsumer $consumer): void
{
$this->handleConsumedMessage($message, $this->handler, $this->middlewares);
$this->handleConsumedMessage($message, $this->handler, $consumer, $this->middlewares);
}

/**
Expand Down
18 changes: 7 additions & 11 deletions src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Consumer implements MessageConsumer
private readonly Retryable $retryable;
private readonly CommitterFactory $committerFactory;
private bool $stopRequested = false;
private ?Closure $whenStopConsuming = null;
private ?Closure $whenStopConsuming;
protected int $lastRestart = 0;
protected Timer $restartTimer;
private Dispatcher $dispatcher;
Expand All @@ -74,6 +74,7 @@ public function __construct(private readonly Config $config, private readonly Me

$this->committerFactory = $committerFactory ?? new DefaultCommitterFactory($this->messageCounter);
$this->dispatcher = App::make(Dispatcher::class);
$this->whenStopConsuming = $this->config->getWhenStopConsumingCallback();
}

/**
Expand Down Expand Up @@ -170,14 +171,6 @@ public function stopConsuming(): void
$this->stopRequested = true;
}

/** @inheritdoc */
public function onStopConsuming(?Closure $onStopConsuming = null): self
{
$this->whenStopConsuming = $onStopConsuming;

return $this;
}

/** Will cancel the stopConsume request initiated by calling the stopConsume method */
public function cancelStopConsume(): void
{
Expand Down Expand Up @@ -232,7 +225,10 @@ private function executeMessage(Message $message): void
// was received and will be consumed as soon as a consumer is available to process it.
$this->dispatcher->dispatch(new StartedConsumingMessage($consumedMessage));

$this->config->getConsumer()->handle($consumedMessage = $this->deserializer->deserialize($consumedMessage));
$this->config->getConsumer()->handle(
$consumedMessage = $this->deserializer->deserialize($consumedMessage),
$this
);
$success = true;

// Dispatch an event informing that a message was consumed.
Expand Down Expand Up @@ -283,7 +279,7 @@ private function executeBatch(Collection $collection): void
$consumedMessages = $collection
->map(fn (Message $message) => $this->deserializer->deserialize($this->getConsumerMessage($message)));

$this->config->getBatchConfig()->getConsumer()->handle($consumedMessages);
$this->config->getBatchConfig()->getConsumer()->handle($consumedMessages, $this);

$collection->each(fn (Message $message) => $this->commit($message, true));
} catch (Throwable $throwable) {
Expand Down
7 changes: 6 additions & 1 deletion src/Consumers/DispatchQueuedHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public function __construct(

public function handle(): void
{
$this->handleConsumedMessage($this->message, $this->handler, $this->middlewares);
// Queued handlers does not have access to an instance of the MessageConsumer class.
$this->handleConsumedMessage(
message: $this->message,
handler: $this->handler,
middlewares: $this->middlewares
);
}
}
5 changes: 3 additions & 2 deletions src/Consumers/NullBatchConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

use Illuminate\Support\Collection;
use Junges\Kafka\Contracts\BatchMessageConsumer;
use Junges\Kafka\Contracts\MessageConsumer;

class NullBatchConsumer implements BatchMessageConsumer
readonly class NullBatchConsumer implements BatchMessageConsumer
{
public function handle(Collection $collection): void
public function handle(Collection $collection, MessageConsumer $consumer): void
{
}
}
2 changes: 1 addition & 1 deletion src/Contracts/BatchMessageConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
interface BatchMessageConsumer
{
/** Handles messages released from batch repository. */
public function handle(Collection $collection): void;
public function handle(Collection $collection, MessageConsumer $consumer): void;
}
2 changes: 1 addition & 1 deletion src/Contracts/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

abstract class Consumer
{
abstract public function handle(ConsumerMessage $message): void;
abstract public function handle(ConsumerMessage $message, MessageConsumer $consumer): void;

/** @throws Throwable */
public function failed(string $message, string $topic, Throwable $exception): never
Expand Down
3 changes: 3 additions & 0 deletions src/Contracts/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public function subscribe(...$topics): self;
/** Assigns a set of partitions this consumer should consume from. */
public function assignPartitions(array $partitionAssignment): self;

/** Defines a callback to be executed when consumer stops consuming messages. */
public function onStopConsuming(callable $onStopConsuming): self;

/** Set the brokers the kafka consumer should use. */
public function withBrokers(?string $brokers): self;

Expand Down
Loading

0 comments on commit cdb48cd

Please sign in to comment.