diff --git a/CHANGELOG.md b/CHANGELOG.md index d7d2f0f4..49f6ae74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ All relevant changes to `mateusjunges/laravel-kafka` will be documented here. - Add after and before consuming callbacks by @mateusjunges and @ebrahimradi in [#192](https://github.com/mateusjunges/laravel-kafka/pull/192) - Add simple events to kafka producer and consumers by @mateusjunges in [#193](https://github.com/mateusjunges/laravel-kafka/pull/193) - Append throwable/exception info when sending a message to DLQ by @mateusjunges in [#194](https://github.com/mateusjunges/laravel-kafka/pull/194) +- Add the ability to assign partitions to a consumer by @mateusjunges in [#234](https://github.com/mateusjunges/laravel-kafka/pull/234) ## [2024-01-09 v1.13.6](https://github.com/mateusjunges/laravel-kafka/compare/v1.13.6...v1.13.7) * Add timeout feature for consumers by @mihaileu in [#233](https://github.com/mateusjunges/laravel-kafka/pull/233) diff --git a/docs/3-installation-and-setup.md b/docs/3-installation-and-setup.md index 3fd6ac6f..d5100ee2 100644 --- a/docs/3-installation-and-setup.md +++ b/docs/3-installation-and-setup.md @@ -26,20 +26,6 @@ return [ */ 'brokers' => env('KAFKA_BROKERS', 'localhost:9092'), - /* - | Default security protocol - */ - 'securityProtocol' => env('KAFKA_SECURITY_PROTOCOL', 'PLAINTEXT'), - - /* - | Default sasl configuration - */ - 'sasl' => [ - 'mechanisms' => env('KAFKA_MECHANISMS', 'PLAINTEXT'), - 'username' => env('KAFKA_USERNAME', null), - 'password' => env('KAFKA_PASSWORD', null) - ], - /* | Kafka consumers belonging to the same consumer group share a group id. | The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by @@ -95,4 +81,5 @@ return [ */ 'cache_driver' => env('KAFKA_CACHE_DRIVER', env('CACHE_DRIVER', 'file')), ]; + ``` diff --git a/docs/advanced-usage/2-graceful-shutdown.md b/docs/advanced-usage/2-graceful-shutdown.md index 6c857269..812b536f 100644 --- a/docs/advanced-usage/2-graceful-shutdown.md +++ b/docs/advanced-usage/2-graceful-shutdown.md @@ -19,7 +19,7 @@ $consumer = Kafka::createConsumer(['topic']) ->build() ->onStopConsuming(static function () { // Do something when the consumer stop consuming messages - }); + }) $consumer->consume(); ``` diff --git a/docs/consuming-messages/9-class-structure.md b/docs/consuming-messages/10-class-structure.md similarity index 99% rename from docs/consuming-messages/9-class-structure.md rename to docs/consuming-messages/10-class-structure.md index 8b152543..7b06cb2f 100644 --- a/docs/consuming-messages/9-class-structure.md +++ b/docs/consuming-messages/10-class-structure.md @@ -1,6 +1,6 @@ --- title: Class structure -weight: 9 +weight: 10 --- Consumer classes are very simple, and it is basically a Laravel Command class. To get started, let's take a look at an example consumer. diff --git a/docs/consuming-messages/10-queueable-handlers.md b/docs/consuming-messages/11-queueable-handlers.md similarity index 98% rename from docs/consuming-messages/10-queueable-handlers.md rename to docs/consuming-messages/11-queueable-handlers.md index f9168839..176cd750 100644 --- a/docs/consuming-messages/10-queueable-handlers.md +++ b/docs/consuming-messages/11-queueable-handlers.md @@ -1,6 +1,6 @@ --- title: Queueable handlers -weight: 10 +weight: 11 --- Queueable handlers allow you to handle your kafka messages in a queue. This will put a job into the Laravel queue system for each message received by your Kafka consumer. diff --git a/docs/consuming-messages/3-assigning-partitions.md b/docs/consuming-messages/3-assigning-partitions.md new file mode 100644 index 00000000..e2b29223 --- /dev/null +++ b/docs/consuming-messages/3-assigning-partitions.md @@ -0,0 +1,31 @@ +--- +title: Assigning consumers to a topic partition +weight: 3 +--- + +Kafka clients allows you to implement your own partition assignment strategies for consumers. + +If you have a topic with multiple consumers and want to assign a consumer to a specific partition topic, you can +use the `assignPartitions` method, available on the `ConsumerBuilder` instance: + + +```php +$partition = 1; // The partition number you want to assign + +$consumer = \Junges\Kafka\Facades\Kafka::createConsumer() + ->assignPartitions([ + new \RdKafka\TopicPartition('your-topic-name', $partition) + ]); +``` + +The `assignPartitions` method accepts an array of `\RdKafka\TopicPartition` objects. You can assign multiple partitions to the same consumer +by adding more entries to the `assignPartitions` parameter: + +```php +$consumer = \Junges\Kafka\Facades\Kafka::createConsumer() + ->assignPartitions([ + new \RdKafka\TopicPartition('your-topic-name', 1) + new \RdKafka\TopicPartition('your-topic-name', 2) + new \RdKafka\TopicPartition('your-topic-name', 3) + ]); +``` \ No newline at end of file diff --git a/docs/consuming-messages/3-consumer-groups.md b/docs/consuming-messages/4-consumer-groups.md similarity index 98% rename from docs/consuming-messages/3-consumer-groups.md rename to docs/consuming-messages/4-consumer-groups.md index 4561430c..27baa2af 100644 --- a/docs/consuming-messages/3-consumer-groups.md +++ b/docs/consuming-messages/4-consumer-groups.md @@ -1,6 +1,6 @@ --- title: Consumer groups -weight: 3 +weight: 4 --- Kafka consumers belonging to the same consumer group share a group id. The consumers in a group divides the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group. diff --git a/docs/consuming-messages/4-message-handlers.md b/docs/consuming-messages/5-message-handlers.md similarity index 99% rename from docs/consuming-messages/4-message-handlers.md rename to docs/consuming-messages/5-message-handlers.md index 2e418a14..71c33eab 100644 --- a/docs/consuming-messages/4-message-handlers.md +++ b/docs/consuming-messages/5-message-handlers.md @@ -1,6 +1,6 @@ --- title: Message handlers -weight: 4 +weight: 5 --- Now that you have created your kafka consumer, you must create a handler for the messages this consumer receives. By default, a consumer is any `callable`. diff --git a/docs/consuming-messages/5-configuring-consumer-options.md b/docs/consuming-messages/6-configuring-consumer-options.md similarity index 99% rename from docs/consuming-messages/5-configuring-consumer-options.md rename to docs/consuming-messages/6-configuring-consumer-options.md index 521c409e..0640d83a 100644 --- a/docs/consuming-messages/5-configuring-consumer-options.md +++ b/docs/consuming-messages/6-configuring-consumer-options.md @@ -1,6 +1,6 @@ --- title: Configuring consumer options -weight: 5 +weight: 6 --- The `ConsumerBuilder` offers you some few configuration options: diff --git a/docs/consuming-messages/6-custom-deserializers.md b/docs/consuming-messages/7-custom-deserializers.md similarity index 99% rename from docs/consuming-messages/6-custom-deserializers.md rename to docs/consuming-messages/7-custom-deserializers.md index 17170ef8..702f335a 100644 --- a/docs/consuming-messages/6-custom-deserializers.md +++ b/docs/consuming-messages/7-custom-deserializers.md @@ -1,6 +1,6 @@ --- title: Custom deserializers -weight: 6 +weight: 7 --- To create a custom deserializer, you need to create a class that implements the `\Junges\Kafka\Contracts\MessageDeserializer` contract. diff --git a/docs/consuming-messages/7-consuming-messages.md b/docs/consuming-messages/8-consuming-messages.md similarity index 97% rename from docs/consuming-messages/7-consuming-messages.md rename to docs/consuming-messages/8-consuming-messages.md index 5929b1c3..154197a1 100644 --- a/docs/consuming-messages/7-consuming-messages.md +++ b/docs/consuming-messages/8-consuming-messages.md @@ -1,6 +1,6 @@ --- title: Consuming messages -weight: 7 +weight: 8 --- After building the consumer, you must call the `consume` method to consume the messages: diff --git a/docs/consuming-messages/8-handling-message-batch.md b/docs/consuming-messages/9-handling-message-batch.md similarity index 98% rename from docs/consuming-messages/8-handling-message-batch.md rename to docs/consuming-messages/9-handling-message-batch.md index 72eb6e61..82b95cc4 100644 --- a/docs/consuming-messages/8-handling-message-batch.md +++ b/docs/consuming-messages/9-handling-message-batch.md @@ -1,6 +1,6 @@ --- title: Handling message batch -weight: 8 +weight: 9 --- If you want to handle multiple messages at once, you can build your consumer enabling batch settings. diff --git a/src/Config/Config.php b/src/Config/Config.php index e24e24d0..08f7a527 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -5,6 +5,7 @@ use JetBrains\PhpStorm\Pure; use Junges\Kafka\Contracts\Consumer; use Junges\Kafka\Contracts\HandlesBatchConfiguration; +use RdKafka\TopicPartition; class Config { @@ -81,6 +82,7 @@ public function __construct( private readonly array $beforeConsumingCallbacks = [], private readonly array $afterConsumingCallbacks = [], private readonly int $maxTime = 0, + private readonly array $partitionAssignment = [], ) { } @@ -212,4 +214,15 @@ public function shouldSendToDlq(): bool { return $this->dlq !== null; } + + public function shouldAssignTopicPartitions(): bool + { + return $this->getPartitionAssigment() !== []; + } + + /** @return array */ + public function getPartitionAssigment(): array + { + return $this->partitionAssignment; + } } diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php index bd274969..1aa67ef6 100644 --- a/src/Consumers/Consumer.php +++ b/src/Consumers/Consumer.php @@ -98,7 +98,15 @@ public function consume(): void $this->committer = $this->committerFactory->make($this->consumer, $this->config); - $this->consumer->subscribe($this->config->getTopics()); + // Calling `subscribe` overrides the assigned topic partitions, so we + // should check if there are any assignment defined before calling + // the subscribe method on the consumer. Partition assignment + // have precedence over topic subscriptions. + if ($this->config->shouldAssignTopicPartitions()) { + $this->consumer->assign($this->config->getPartitionAssigment()); + } else { + $this->consumer->subscribe($this->config->getTopics()); + } $batchConfig = $this->config->getBatchConfig(); diff --git a/src/Consumers/ConsumerBuilder.php b/src/Consumers/ConsumerBuilder.php index b5512140..0e0adfd9 100644 --- a/src/Consumers/ConsumerBuilder.php +++ b/src/Consumers/ConsumerBuilder.php @@ -18,6 +18,7 @@ use Junges\Kafka\Contracts\Middleware; use Junges\Kafka\Exceptions\ConsumerException; use Junges\Kafka\Support\Timer; +use RdKafka\TopicPartition; class ConsumerBuilder implements ConsumerBuilderContract { @@ -51,6 +52,9 @@ class ConsumerBuilder implements ConsumerBuilderContract /** @var list */ protected array $afterConsumingCallbacks = []; + /** @var array */ + protected array $partitionAssignment = []; + protected function __construct(protected string $brokers, array $topics = [], protected ?string $groupId = null) { if (count($topics) > 0) { @@ -290,6 +294,19 @@ public function afterConsuming(callable $callable): self return $this; } + public function assignPartitions(array $partitionAssignment): self + { + foreach ($partitionAssignment as $assigment) { + if (! $assigment instanceof TopicPartition) { + throw new InvalidArgumentException('The partition assignment must be an instance of [\RdKafka\TopicPartition]'); + } + } + + $this->partitionAssignment = $partitionAssignment; + + return $this; + } + /** @inheritDoc */ public function build(): MessageConsumer { @@ -312,6 +329,7 @@ public function build(): MessageConsumer beforeConsumingCallbacks: $this->beforeConsumingCallbacks, afterConsumingCallbacks: $this->afterConsumingCallbacks, maxTime: $this->maxTime, + partitionAssignment: $this->partitionAssignment, ); return new Consumer($config, $this->deserializer, $this->committerFactory);