Skip to content

Commit

Permalink
[v2.x] Partition assignment (#234)
Browse files Browse the repository at this point in the history
* Experimenting with partition assignment

* Fix partition assignment

* Revert "wip: docs"

This reverts commit f219cc1.

* Updating docs

* Rename variable

* Rename variable

* Update changelog
  • Loading branch information
mateusjunges authored Jan 9, 2024
1 parent 98c49ff commit 389b06d
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All relevant changes to `mateusjunges/laravel-kafka` will be documented here.
- Add after and before consuming callbacks by @mateusjunges and @ebrahimradi in [#192](https://github.com/mateusjunges/laravel-kafka/pull/192)
- Add simple events to kafka producer and consumers by @mateusjunges in [#193](https://github.com/mateusjunges/laravel-kafka/pull/193)
- Append throwable/exception info when sending a message to DLQ by @mateusjunges in [#194](https://github.com/mateusjunges/laravel-kafka/pull/194)
- Add the ability to assign partitions to a consumer by @mateusjunges in [#234](https://github.com/mateusjunges/laravel-kafka/pull/234)

## [2024-01-09 v1.13.6](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.6...v1.13.7)
* Add timeout feature for consumers by @mihaileu in [#233](https://github.com/mateusjunges/laravel-kafka/pull/233)
Expand Down
15 changes: 1 addition & 14 deletions docs/3-installation-and-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,6 @@ return [
*/
'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),

/*
| Default security protocol
*/
'securityProtocol' => env('KAFKA_SECURITY_PROTOCOL', 'PLAINTEXT'),

/*
| Default sasl configuration
*/
'sasl' => [
'mechanisms' => env('KAFKA_MECHANISMS', 'PLAINTEXT'),
'username' => env('KAFKA_USERNAME', null),
'password' => env('KAFKA_PASSWORD', null)
],

/*
| Kafka consumers belonging to the same consumer group share a group id.
| The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by
Expand Down Expand Up @@ -95,4 +81,5 @@ return [
*/
'cache_driver' => env('KAFKA_CACHE_DRIVER', env('CACHE_DRIVER', 'file')),
];

```
2 changes: 1 addition & 1 deletion docs/advanced-usage/2-graceful-shutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ $consumer = Kafka::createConsumer(['topic'])
->build()
->onStopConsuming(static function () {
// Do something when the consumer stop consuming messages
});
})

$consumer->consume();
```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Class structure
weight: 9
weight: 10
---

Consumer classes are very simple, and it is basically a Laravel Command class. To get started, let's take a look at an example consumer.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Queueable handlers
weight: 10
weight: 11
---

Queueable handlers allow you to handle your kafka messages in a queue. This will put a job into the Laravel queue system for each message received by your Kafka consumer.
Expand Down
31 changes: 31 additions & 0 deletions docs/consuming-messages/3-assigning-partitions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
title: Assigning consumers to a topic partition
weight: 3
---

Kafka clients allows you to implement your own partition assignment strategies for consumers.

If you have a topic with multiple consumers and want to assign a consumer to a specific partition topic, you can
use the `assignPartitions` method, available on the `ConsumerBuilder` instance:


```php
$partition = 1; // The partition number you want to assign

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->assignPartitions([
new \RdKafka\TopicPartition('your-topic-name', $partition)
]);
```

The `assignPartitions` method accepts an array of `\RdKafka\TopicPartition` objects. You can assign multiple partitions to the same consumer
by adding more entries to the `assignPartitions` parameter:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->assignPartitions([
new \RdKafka\TopicPartition('your-topic-name', 1)
new \RdKafka\TopicPartition('your-topic-name', 2)
new \RdKafka\TopicPartition('your-topic-name', 3)
]);
```
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Consumer groups
weight: 3
weight: 4
---

Kafka consumers belonging to the same consumer group share a group id. The consumers in a group divides the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Message handlers
weight: 4
weight: 5
---

Now that you have created your kafka consumer, you must create a handler for the messages this consumer receives. By default, a consumer is any `callable`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Configuring consumer options
weight: 5
weight: 6
---

The `ConsumerBuilder` offers you some few configuration options:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Custom deserializers
weight: 6
weight: 7
---

To create a custom deserializer, you need to create a class that implements the `\Junges\Kafka\Contracts\MessageDeserializer` contract.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Consuming messages
weight: 7
weight: 8
---

After building the consumer, you must call the `consume` method to consume the messages:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Handling message batch
weight: 8
weight: 9
---

If you want to handle multiple messages at once, you can build your consumer enabling batch settings.
Expand Down
13 changes: 13 additions & 0 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use JetBrains\PhpStorm\Pure;
use Junges\Kafka\Contracts\Consumer;
use Junges\Kafka\Contracts\HandlesBatchConfiguration;
use RdKafka\TopicPartition;

class Config
{
Expand Down Expand Up @@ -81,6 +82,7 @@ public function __construct(
private readonly array $beforeConsumingCallbacks = [],
private readonly array $afterConsumingCallbacks = [],
private readonly int $maxTime = 0,
private readonly array $partitionAssignment = [],
) {
}

Expand Down Expand Up @@ -212,4 +214,15 @@ public function shouldSendToDlq(): bool
{
return $this->dlq !== null;
}

public function shouldAssignTopicPartitions(): bool
{
return $this->getPartitionAssigment() !== [];
}

/** @return array<int, TopicPartition> */
public function getPartitionAssigment(): array
{
return $this->partitionAssignment;
}
}
10 changes: 9 additions & 1 deletion src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,15 @@ public function consume(): void

$this->committer = $this->committerFactory->make($this->consumer, $this->config);

$this->consumer->subscribe($this->config->getTopics());
// Calling `subscribe` overrides the assigned topic partitions, so we
// should check if there are any assignment defined before calling
// the subscribe method on the consumer. Partition assignment
// have precedence over topic subscriptions.
if ($this->config->shouldAssignTopicPartitions()) {
$this->consumer->assign($this->config->getPartitionAssigment());
} else {
$this->consumer->subscribe($this->config->getTopics());
}

$batchConfig = $this->config->getBatchConfig();

Expand Down
18 changes: 18 additions & 0 deletions src/Consumers/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Junges\Kafka\Contracts\Middleware;
use Junges\Kafka\Exceptions\ConsumerException;
use Junges\Kafka\Support\Timer;
use RdKafka\TopicPartition;

class ConsumerBuilder implements ConsumerBuilderContract
{
Expand Down Expand Up @@ -51,6 +52,9 @@ class ConsumerBuilder implements ConsumerBuilderContract
/** @var list<callable> */
protected array $afterConsumingCallbacks = [];

/** @var array<int, TopicPartition> */
protected array $partitionAssignment = [];

protected function __construct(protected string $brokers, array $topics = [], protected ?string $groupId = null)
{
if (count($topics) > 0) {
Expand Down Expand Up @@ -290,6 +294,19 @@ public function afterConsuming(callable $callable): self
return $this;
}

public function assignPartitions(array $partitionAssignment): self
{
foreach ($partitionAssignment as $assigment) {
if (! $assigment instanceof TopicPartition) {
throw new InvalidArgumentException('The partition assignment must be an instance of [\RdKafka\TopicPartition]');
}
}

$this->partitionAssignment = $partitionAssignment;

return $this;
}

/** @inheritDoc */
public function build(): MessageConsumer
{
Expand All @@ -312,6 +329,7 @@ public function build(): MessageConsumer
beforeConsumingCallbacks: $this->beforeConsumingCallbacks,
afterConsumingCallbacks: $this->afterConsumingCallbacks,
maxTime: $this->maxTime,
partitionAssignment: $this->partitionAssignment,
);

return new Consumer($config, $this->deserializer, $this->committerFactory);
Expand Down

0 comments on commit 389b06d

Please sign in to comment.