Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set onStopConsuming while building consumer and make MessageConsumer accessible to handler functions #260

Merged
merged 8 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions docs/6-upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,27 @@ The `withSasl` method now accepts all `SASL` parameters instead of a `Sasl` obje
public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT');
```

#### Setting `onStopConsuming` callbacks

To set `onStopConsuming` callbacks you need to define them while building the consumer, instead of after calling the `build` method as in `v1.13.x`:

```diff
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
+ ->onStopConsuming(static function () {
+ // Do something when the consumer stop consuming messages
+ })
->build()
- ->onStopConsuming(static function () {
- // Do something when the consumer stop consuming messages
- })
```


### Updating dependencies
**PHP 8.1 Required**
**PHP 8.2 Required**

This package now requires PHP 8.1 or higher.
This package now requires PHP 8.2 or higher.

You can use tools such as [rector](https://github.com/rectorphp/rector) to upgrade your app to PHP 8.1.
You can use tools such as [rector](https://github.com/rectorphp/rector) to upgrade your app to PHP 8.2.
6 changes: 3 additions & 3 deletions docs/advanced-usage/2-graceful-shutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ weight: 2

Stopping consumers is very useful if you want to ensure you don't kill a process halfway through processing a consumed message.

Starting from version `1.12.x` of this package, consumers automatically listen to the `SIGTERM`, `SIGINT` and `SIQUIT` signals, which means you can easily stop your consumers using those signals.
Consumers automatically listen to the `SIGTERM`, `SIGINT` and `SIQUIT` signals, which means you can easily stop your consumers using those signals.

### Running callbacks when the consumer stops
If your app requires that you run sum sort of processing when the consumers stop processing messages, you can use the `onStopConsume` method, available on the `\Junges\Kafka\Contracts\CanConsumeMessages` interface. This method accepts a `Closure` that will run once your consumer stops consuming.
Expand All @@ -16,12 +16,12 @@ use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
->build()
->onStopConsuming(static function () {
// Do something when the consumer stop consuming messages
})
->build();

$consumer->consume();
```

> You will require the [ Process Control Extension ](https://www.php.net/manual/en/book.pcntl.php) to be installed to utilise this feature.
> This features requires [ Process Control Extension ](https://www.php.net/manual/en/book.pcntl.php) to be installed.
25 changes: 25 additions & 0 deletions docs/advanced-usage/6-stopping-a-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
title: Stop consumer on demand
weight: 6
---

Sometimes, you may want to stop your consumer based on a given message or any other condition.

You can do it by adding a calling `stopConsuming()` method on the `MessageConsumer` instance that is passed as the
second argument of your message handler:

```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->stopAfterLastMessage()
->withHandler(static function (\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
if ($someCondition) {
$consumer->stopConsuming();
}
})
->build();

$consumer->consume();
```

The `onStopConsuming` callback will be executed before stopping your consumer.
4 changes: 2 additions & 2 deletions docs/consuming-messages/10-class-structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Consumer classes are very simple, and it is basically a Laravel Command class. T
namespace App\Console\Commands\Consumers;

use Illuminate\Console\Command;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Contracts\MessageConsumer;use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Contracts\ConsumerMessage;

class MyTopicConsumer extends Command
Expand All @@ -25,7 +25,7 @@ class MyTopicConsumer extends Command
$consumer = Kafka::consumer(['my-topic'])
->withBrokers('localhost:8092')
->withAutoCommit()
->withHandler(function(ConsumerMessage $message) {
->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) {
// Handle your message here
})
->build();
Expand Down
5 changes: 5 additions & 0 deletions docs/consuming-messages/11-queueable-handlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ class Handler implements HandlerContract, ShouldQueue
}
```

As you can see on the `__invoke` method, queued handlers does not have access to a `MessageConsumer` instance when handling the message,
because it's running on a laravel queue and there are no actions that can be performed asynchronously on Kafka message consumer.

After creating your handler class, you can use it just as a normal handler, and `laravel-kafka` will know how to handle it under the hoods 😄.


6 changes: 3 additions & 3 deletions docs/consuming-messages/5-message-handlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ You can use an invokable class or a simple callback. Use the `withHandler` metho
$consumer = \Junges\Kafka\Facades\Kafka::consumer();

// Using callback:
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message) {
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle your message here
});
```
Expand All @@ -20,15 +20,15 @@ Or, using an invokable class:
```php
class Handler
{
public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message){
public function __invoke(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle your message here
}
}

$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withHandler(new Handler)
```

The `KafkaConsumerMessage` contract gives you some handy methods to get the message properties:
The `ConsumerMessage` contract gives you some handy methods to get the message properties:

- `getKey()`: Returns the Kafka Message Key
- `getTopicName()`: Returns the topic where the message was published
Expand Down
2 changes: 1 addition & 1 deletion docs/consuming-messages/9-handling-message-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ $consumer = \Junges\Kafka\Facades\Kafka::consumer()
->enableBatching()
->withBatchSizeLimit(1000)
->withBatchReleaseInterval(1500)
->withHandler(function (\Illuminate\Support\Collection $collection) {
->withHandler(function (\Illuminate\Support\Collection $collection, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle batch
})
->build();
Expand Down
5 changes: 3 additions & 2 deletions src/Concerns/HandleConsumedMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
use Closure;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\Handler;
use Junges\Kafka\Contracts\MessageConsumer;

/**
* @internal
* @mixin \Junges\Kafka\Concerns\PrepareMiddlewares
*/
trait HandleConsumedMessage
{
private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler, array $middlewares = []): void
private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler, ?MessageConsumer $consumer = null, array $middlewares = []): void
{
$middlewares = array_map($this->wrapMiddleware(...), $middlewares);
$middlewares = array_reverse($middlewares);
Expand All @@ -21,6 +22,6 @@ private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure
$handler = $middleware($handler);
}

$handler($message);
$handler($message, $consumer);
}
}
7 changes: 7 additions & 0 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Junges\Kafka\Config;

use Closure;
use JetBrains\PhpStorm\Pure;
use Junges\Kafka\Contracts\Consumer;
use Junges\Kafka\Contracts\HandlesBatchConfiguration;
Expand Down Expand Up @@ -83,6 +84,7 @@ public function __construct(
private readonly array $afterConsumingCallbacks = [],
private readonly int $maxTime = 0,
private readonly array $partitionAssignment = [],
private readonly ?Closure $whenStopConsuming = null,
) {
}

Expand Down Expand Up @@ -225,4 +227,9 @@ public function getPartitionAssigment(): array
{
return $this->partitionAssignment;
}

public function getWhenStopConsumingCallback(): ?Closure
{
return $this->whenStopConsuming;
}
}
10 changes: 10 additions & 0 deletions src/Consumers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class Builder implements ConsumerBuilderContract
/** @var array<int, TopicPartition> */
protected array $partitionAssignment = [];

protected ?Closure $onStopConsuming = null;

protected function __construct(protected string $brokers, array $topics = [], protected ?string $groupId = null)
{
if (count($topics) > 0) {
Expand Down Expand Up @@ -314,6 +316,13 @@ public function assignPartitions(array $partitionAssignment): self
return $this;
}

public function onStopConsuming(callable $onStopConsuming): self
{
$this->onStopConsuming = $onStopConsuming(...);

return $this;
}

/** @inheritDoc */
public function build(): MessageConsumer
{
Expand All @@ -337,6 +346,7 @@ public function build(): MessageConsumer
afterConsumingCallbacks: $this->afterConsumingCallbacks,
maxTime: $this->maxTime,
partitionAssignment: $this->partitionAssignment,
whenStopConsuming: $this->onStopConsuming,
);

return new Consumer($config, $this->deserializer, $this->committerFactory);
Expand Down
9 changes: 5 additions & 4 deletions src/Consumers/CallableBatchConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
use Closure;
use Illuminate\Support\Collection;
use Junges\Kafka\Contracts\BatchMessageConsumer;
use Junges\Kafka\Contracts\MessageConsumer;

class CallableBatchConsumer implements BatchMessageConsumer
readonly class CallableBatchConsumer implements BatchMessageConsumer
{
public function __construct(private readonly Closure $batchHandler)
public function __construct(private Closure $batchHandler)
{
}

public function handle(Collection $collection): void
public function handle(Collection $collection, MessageConsumer $consumer): void
{
($this->batchHandler)($collection);
($this->batchHandler)($collection, $consumer);
}
}
9 changes: 5 additions & 4 deletions src/Consumers/CallableConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Junges\Kafka\Contracts\Consumer;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\Handler;
use Junges\Kafka\Contracts\MessageConsumer;

class CallableConsumer extends Consumer
{
Expand All @@ -29,7 +30,7 @@ public function __construct(private Closure|Handler $handler, private readonly a
}

/** Handle the received message. */
public function handle(ConsumerMessage $message): void
public function handle(ConsumerMessage $message, MessageConsumer $consumer): void
{
// If the message handler should be queued, we will dispatch a job to handle this message.
// Otherwise, the message will be handled synchronously.
Expand All @@ -39,17 +40,17 @@ public function handle(ConsumerMessage $message): void
return;
}

$this->handleMessageSynchronously($message);
$this->handleMessageSynchronously($message, $consumer);
}

private function shouldQueueHandler(): bool
{
return $this->handler instanceof ShouldQueue;
}

private function handleMessageSynchronously(ConsumerMessage $message): void
private function handleMessageSynchronously(ConsumerMessage $message, MessageConsumer $consumer): void
{
$this->handleConsumedMessage($message, $this->handler, $this->middlewares);
$this->handleConsumedMessage($message, $this->handler, $consumer, $this->middlewares);
}

/**
Expand Down
18 changes: 7 additions & 11 deletions src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Consumer implements MessageConsumer
private readonly Retryable $retryable;
private readonly CommitterFactory $committerFactory;
private bool $stopRequested = false;
private ?Closure $whenStopConsuming = null;
private ?Closure $whenStopConsuming;
protected int $lastRestart = 0;
protected Timer $restartTimer;
private Dispatcher $dispatcher;
Expand All @@ -74,6 +74,7 @@ public function __construct(private readonly Config $config, private readonly Me

$this->committerFactory = $committerFactory ?? new DefaultCommitterFactory($this->messageCounter);
$this->dispatcher = App::make(Dispatcher::class);
$this->whenStopConsuming = $this->config->getWhenStopConsumingCallback();
}

/**
Expand Down Expand Up @@ -170,14 +171,6 @@ public function stopConsuming(): void
$this->stopRequested = true;
}

/** @inheritdoc */
public function onStopConsuming(?Closure $onStopConsuming = null): self
{
$this->whenStopConsuming = $onStopConsuming;

return $this;
}

/** Will cancel the stopConsume request initiated by calling the stopConsume method */
public function cancelStopConsume(): void
{
Expand Down Expand Up @@ -232,7 +225,10 @@ private function executeMessage(Message $message): void
// was received and will be consumed as soon as a consumer is available to process it.
$this->dispatcher->dispatch(new StartedConsumingMessage($consumedMessage));

$this->config->getConsumer()->handle($consumedMessage = $this->deserializer->deserialize($consumedMessage));
$this->config->getConsumer()->handle(
$consumedMessage = $this->deserializer->deserialize($consumedMessage),
$this
);
$success = true;

// Dispatch an event informing that a message was consumed.
Expand Down Expand Up @@ -283,7 +279,7 @@ private function executeBatch(Collection $collection): void
$consumedMessages = $collection
->map(fn (Message $message) => $this->deserializer->deserialize($this->getConsumerMessage($message)));

$this->config->getBatchConfig()->getConsumer()->handle($consumedMessages);
$this->config->getBatchConfig()->getConsumer()->handle($consumedMessages, $this);

$collection->each(fn (Message $message) => $this->commit($message, true));
} catch (Throwable $throwable) {
Expand Down
7 changes: 6 additions & 1 deletion src/Consumers/DispatchQueuedHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public function __construct(

public function handle(): void
{
$this->handleConsumedMessage($this->message, $this->handler, $this->middlewares);
// Queued handlers does not have access to an instance of the MessageConsumer class.
$this->handleConsumedMessage(
message: $this->message,
handler: $this->handler,
middlewares: $this->middlewares
);
}
}
5 changes: 3 additions & 2 deletions src/Consumers/NullBatchConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

use Illuminate\Support\Collection;
use Junges\Kafka\Contracts\BatchMessageConsumer;
use Junges\Kafka\Contracts\MessageConsumer;

class NullBatchConsumer implements BatchMessageConsumer
readonly class NullBatchConsumer implements BatchMessageConsumer
{
public function handle(Collection $collection): void
public function handle(Collection $collection, MessageConsumer $consumer): void
{
}
}
2 changes: 1 addition & 1 deletion src/Contracts/BatchMessageConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
interface BatchMessageConsumer
{
/** Handles messages released from batch repository. */
public function handle(Collection $collection): void;
public function handle(Collection $collection, MessageConsumer $consumer): void;
}
2 changes: 1 addition & 1 deletion src/Contracts/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

abstract class Consumer
{
abstract public function handle(ConsumerMessage $message): void;
abstract public function handle(ConsumerMessage $message, MessageConsumer $consumer): void;

/** @throws Throwable */
public function failed(string $message, string $topic, Throwable $exception): never
Expand Down
3 changes: 3 additions & 0 deletions src/Contracts/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public function subscribe(...$topics): self;
/** Assigns a set of partitions this consumer should consume from. */
public function assignPartitions(array $partitionAssignment): self;

/** Defines a callback to be executed when consumer stops consuming messages. */
public function onStopConsuming(callable $onStopConsuming): self;

/** Set the brokers the kafka consumer should use. */
public function withBrokers(?string $brokers): self;

Expand Down
Loading
Loading