Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/v2.x' into feat/partition-assign…
Browse files Browse the repository at this point in the history
…ment

# Conflicts:
#	src/Config/Config.php
#	src/Consumers/ConsumerBuilder.php
  • Loading branch information
mateusjunges committed Jan 9, 2024
2 parents deff49a + 98c49ff commit 9895f73
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 4 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@

All relevant changes to `mateusjunges/laravel-kafka` will be documented here.

## [2023-10-xx v2.0.0](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.x...v2.0.0)
## [2024-01-xx v2.0.0](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.x...v2.0.0)
- Add the ability to queue message handlers by @mateusjunges in [#177](https://github.com/mateusjunges/laravel-kafka/pull/177)
- Add after and before consuming callbacks by @mateusjunges and @ebrahimradi in [#192](https://github.com/mateusjunges/laravel-kafka/pull/192)
- Add simple events to kafka producer and consumers by @mateusjunges in [#193](https://github.com/mateusjunges/laravel-kafka/pull/193)
- Append throwable/exception info when sending a message to DLQ by @mateusjunges in [#194](https://github.com/mateusjunges/laravel-kafka/pull/194)
- Add the ability to assign partitions to a consumer by @mateusjunges in [#234](https://github.com/mateusjunges/laravel-kafka/pull/234)

## [2024-01-09 v1.13.6](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.6...v1.13.7)
* Add timeout feature for consumers by @mihaileu in [#233](https://github.com/mateusjunges/laravel-kafka/pull/233)

## [2023-12-07 v1.13.6](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.5...v1.13.6)
* Remove internal annotation from interface by @mateusjunges

## [2023-10-24 v1.13.5](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.4...v1.13.5)
* Fixed default securityProtocol config by @SergkeiM in [#215](https://github.com/mateusjunges/laravel-kafka/pull/215)

Expand Down
8 changes: 8 additions & 0 deletions docs/consuming-messages/6-configuring-consumer-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ kafka consumer:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxMessages(2);
```

### Configuring the max time when a consumer can process messages
If you want to consume a limited amount of time, you can use the `withMaxTime` method to set the max number of seconds for
kafka consumer to process messages:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxTime(3600);
```

### Setting Kafka configuration options
To set configuration options, you can use two methods: `withOptions`, passing an array of option and option value or, using the `withOption method and
passing two arguments, the option name and the option value.
Expand Down
6 changes: 6 additions & 0 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public function __construct(
private readonly array $callbacks = [],
private readonly array $beforeConsumingCallbacks = [],
private readonly array $afterConsumingCallbacks = [],
private readonly int $maxTime = 0,
private readonly array $partitionAssignment = [],
) {
}
Expand Down Expand Up @@ -115,6 +116,11 @@ public function getMaxMessages(): int
return $this->maxMessages;
}

public function getMaxTime(): int
{
return $this->maxTime;
}

public function isAutoCommit(): bool
{
return $this->autoCommit;
Expand Down
19 changes: 17 additions & 2 deletions src/Console/Commands/ConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ConsumerCommand extends Command
{--commit=1}
{--dlq=? : The Dead Letter Queue}
{--maxMessage=? : The max number of messages that should be handled}
{--maxTime=0 : The max number of seconds that a consumer should run }
{--securityProtocol=?}';

/* @var string $description */
Expand Down Expand Up @@ -56,7 +57,7 @@ public function handle()
return;
}

$parsedOptions = array_map(fn ($value) => $value === '?' ? null : $value, $this->options());
$parsedOptions = array_map($this->parseOptions(...), $this->options());

$options = new Options($parsedOptions, $this->config);

Expand All @@ -72,7 +73,8 @@ public function handle()
consumer: app($consumer),
sasl: $options->getSasl(),
dlq: $options->getDlq(),
maxMessages: $options->getMaxMessages()
maxMessages: $options->getMaxMessages(),
maxTime: $options->getMaxTime(),
);

/** @var Consumer $consumer */
Expand All @@ -83,4 +85,17 @@ public function handle()

$consumer->consume();
}

private function parseOptions(int|string|null $option): int|string|null
{
if ($option === '?') {
return null;
}

if (is_numeric($option)) {
return (int) $option;
}

return $option;
}
}
6 changes: 6 additions & 0 deletions src/Console/Commands/KafkaConsumer/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ final class Options
private int|string|null $commit = 1;
private ?string $dlq = null;
private int $maxMessages = -1;
private int $maxTime = 0;
private ?string $securityProtocol = 'plaintext';
private readonly ?string $saslUsername;
private readonly ?string $saslPassword;
Expand Down Expand Up @@ -68,6 +69,11 @@ public function getMaxMessages(): int
return $this->maxMessages >= 1 ? $this->maxMessages : -1;
}

public function getMaxTime(): int
{
return $this->maxTime;
}

#[Pure]
public function getSasl(): ?Sasl
{
Expand Down
16 changes: 15 additions & 1 deletion src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Junges\Kafka\Exceptions\ConsumerException;
use Junges\Kafka\MessageCounter;
use Junges\Kafka\Retryable;
use Junges\Kafka\Support\InfiniteTimer;
use Junges\Kafka\Support\Timer;
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
Expand Down Expand Up @@ -82,6 +83,7 @@ public function consume(): void
{
$this->cancelStopConsume();
$this->configureRestartTimer();
$stopTimer = $this->configureStopTimer();

if ($this->supportAsyncSignals()) {
$this->listenForSignals();
Expand Down Expand Up @@ -117,7 +119,7 @@ public function consume(): void
$this->retryable->retry(fn () => $this->doConsume());
$this->runAfterConsumingCallbacks();
$this->checkForRestart();
} while (! $this->maxMessagesLimitReached() && ! $this->stopRequested);
} while (! $this->maxMessagesLimitReached() && ! $stopTimer->isTimedOut() && ! $this->stopRequested);

if ($this->shouldRunStopConsumingCallback()) {
$callback = $this->whenStopConsuming;
Expand Down Expand Up @@ -367,6 +369,18 @@ private function maxMessagesLimitReached(): bool
return $this->messageCounter->maxMessagesLimitReached();
}

public function configureStopTimer(): Timer
{
$stopTimer = new Timer();
if ($this->config->getMaxTime() === 0) {
$stopTimer = new InfiniteTimer();
}

$stopTimer->start($this->config->getMaxTime() * 1000);

return $stopTimer;
}

/**
* Handle the message.
*
Expand Down
12 changes: 12 additions & 0 deletions src/Consumers/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ConsumerBuilder implements ConsumerBuilderContract
protected int $commit;
protected Closure | Handler $handler;
protected int $maxMessages;
protected int $maxTime = 0;
protected int $maxCommitRetries;

/** @var list<callable> */
Expand Down Expand Up @@ -163,6 +164,16 @@ public function withMaxMessages(int $maxMessages): self
return $this;
}

/**
* @inheritDoc
*/
public function withMaxTime(int $maxTime): self
{
$this->maxTime = $maxTime;

return $this;
}

/** @inheritDoc */
public function withMaxCommitRetries(int $maxCommitRetries): self
{
Expand Down Expand Up @@ -317,6 +328,7 @@ public function build(): MessageConsumer
callbacks: $this->callbacks,
beforeConsumingCallbacks: $this->beforeConsumingCallbacks,
afterConsumingCallbacks: $this->afterConsumingCallbacks,
maxTime: $this->maxTime,
partitionAssignment: $this->partitionAssignment,
);

Expand Down
14 changes: 14 additions & 0 deletions src/Contracts/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ public function usingCommitterFactory(CommitterFactory $committerFactory): self;
public function withMaxMessages(int $maxMessages): self;

/** Specify the max retries attempts. */
/**
* Define the max number seconds that a consumer should run
*
* @param int $maxTime
* @return \Junges\Kafka\Consumers\ConsumerBuilder
*/
public function withMaxTime(int $maxTime): self;

/**
* Specify the max retries attempts.
*
* @param int $maxCommitRetries
* @return \Junges\Kafka\Consumers\ConsumerBuilder
*/
public function withMaxCommitRetries(int $maxCommitRetries): self;

/**
Expand Down
11 changes: 11 additions & 0 deletions src/Support/InfiniteTimer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

namespace Junges\Kafka\Support;

class InfiniteTimer extends Timer
{
public function isTimedOut(): bool
{
return false;
}
}
50 changes: 50 additions & 0 deletions tests/Consumers/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,56 @@ function (ConsumerMessage $message) {
$this->assertEquals(1, $consumer->consumedMessagesCount());
}

public function testCanStopConsumeIfMaxTimeReached()
{
$message = new Message();
$message->err = 0;
$message->key = 'key';
$message->topic_name = 'test';
$message->payload = '{"body": "message payload"}';
$message->offset = 0;
$message->partition = 1;
$message->headers = [];

$message2 = new Message();
$message2->err = 0;
$message2->key = 'key2';
$message2->topic_name = 'test2';
$message2->payload = '{"body": "message payload2"}';
$message2->offset = 0;
$message2->partition = 1;
$message2->headers = [];

$this->mockConsumerWithMessage($message, $message2);
$this->mockProducer();

$fakeHandler = new CallableConsumer(
function (ConsumerMessage $message) {
sleep(2);
},
[]
);

$config = new Config(
broker: 'broker',
topics: ['test-topic'],
securityProtocol: 'security',
commit: 1,
groupId: 'group',
consumer: $fakeHandler,
sasl: null,
dlq: null,
maxMessages: 2,
maxTime: 1,
);

$consumer = new Consumer($config, new JsonDeserializer());
$consumer->consume();

//finally only one message should be consumed
$this->assertEquals(1, $consumer->consumedMessagesCount());
}

public function testItRunCallbacksBeforeConsume(): void
{
$fakeHandler = new FakeHandler();
Expand Down

0 comments on commit 9895f73

Please sign in to comment.