diff --git a/docs/producing-messages/6-producing-message-batch-to-kafka.md b/docs/producing-messages/6-producing-message-batch-to-kafka.md index 32fb5f1..483adac 100644 --- a/docs/producing-messages/6-producing-message-batch-to-kafka.md +++ b/docs/producing-messages/6-producing-message-batch-to-kafka.md @@ -20,7 +20,8 @@ use Junges\Kafka\Message\Message; $message = new Message( headers: ['header-key' => 'header-value'], body: ['key' => 'value'], - key: 'kafka key here' + key: 'kafka key here', + topicName: 'my_topic' ) $messageBatch = new MessageBatch(); @@ -35,4 +36,9 @@ $producer = Kafka::publish('broker') ->withConfigOptions(['key' => 'value']); $producer->sendBatch($messageBatch); -``` \ No newline at end of file +``` + +When producing batch messages, you can specify the topic for each message that you want to publish. If you want to publish all messages in the same topic, +you can use the `onTopic` method on the `MessageBatch` class to specify the topic once for all messages. Please note that +if a message within the batch specifies a topic, we will use that topic to publish the message, instead of the topic defined on the +batch itself. \ No newline at end of file diff --git a/src/Producers/Producer.php b/src/Producers/Producer.php index e9de95c..b851dac 100644 --- a/src/Producers/Producer.php +++ b/src/Producers/Producer.php @@ -78,11 +78,7 @@ public function produce(ProducerMessage $message): bool /** @inheritDoc */ public function produceBatch(MessageBatch $messageBatch): int { - if ($messageBatch->getTopicName() === '') { - throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName()); - } - - $topic = $this->producer->newTopic($messageBatch->getTopicName()); + $this->assertTopicWasSetForAllBatchMessages($messageBatch); $messagesIterator = $messageBatch->getMessages(); @@ -94,7 +90,13 @@ public function produceBatch(MessageBatch $messageBatch): int foreach ($messagesIterator as $message) { assert($message instanceof Message); - $message->onTopic($messageBatch->getTopicName()); + + if ($message->getTopicName() === null) { + $message->onTopic($messageBatch->getTopicName()); + } + + $topic = $this->producer->newTopic($message->getTopicName()); + $message = $this->serializer->serialize($message); $this->produceMessageBatch($topic, $message, $messageBatch->getBatchUuid()); @@ -111,6 +113,30 @@ public function produceBatch(MessageBatch $messageBatch): int return $produced; } + /** @throws CouldNotPublishMessageBatch */ + private function assertTopicWasSetForAllBatchMessages(MessageBatch $batch): void + { + // If the message batch has a topic set, we can return here because + // we can use that topic as a fallback in case not all batch + // messages have a specific topic to be used. + if ($batch->getTopicName() !== '') { + return; + } + + $messagesIterator = $batch->getMessages(); + + foreach ($messagesIterator as $message) { + assert($message instanceof Message); + + // If the batch does not have a topic defined, we check if the message + // itself has specified a topic in which it should be published. + // If it does not, then we throw an exception. + if ($message->getTopicName() === '' || $message->getTopicName() === null) { + throw CouldNotPublishMessageBatch::invalidTopicName($message->getTopicName() ?? ''); + } + } + } + private function produceMessage(ProducerTopic $topic, ProducerMessage $message): void { $topic->producev( diff --git a/tests/KafkaTest.php b/tests/KafkaTest.php index e3adf12..35408e9 100644 --- a/tests/KafkaTest.php +++ b/tests/KafkaTest.php @@ -13,12 +13,14 @@ use Junges\Kafka\Events\MessagePublished; use Junges\Kafka\Events\PublishingMessageBatch; use Junges\Kafka\Exceptions\CouldNotPublishMessage; +use Junges\Kafka\Exceptions\CouldNotPublishMessageBatch; use Junges\Kafka\Facades\Kafka; use Junges\Kafka\Message\Message; use Junges\Kafka\Message\Serializers\JsonSerializer; use Junges\Kafka\Producers\Builder as ProducerBuilder; use Junges\Kafka\Producers\MessageBatch; use Mockery as m; +use PHPUnit\Framework\Attributes\Test; use RdKafka\Producer; use RdKafka\ProducerTopic; @@ -285,10 +287,10 @@ public function testProducerThrowsExceptionIfMessageCouldNotBePublished(): void public function testSendMessageBatch(): void { - $messageBatch = (new MessageBatch())->onTopic('test'); - $messageBatch->push(new Message('test')); - $messageBatch->push(new Message('test')); - $messageBatch->push(new Message('test')); + $messageBatch = new MessageBatch; + $messageBatch->push(new Message('test_1')); + $messageBatch->push(new Message('test_2')); + $messageBatch->push(new Message('test_3')); $expectedUuid = $messageBatch->getBatchUuid(); @@ -299,8 +301,9 @@ public function testSendMessageBatch(): void ->getMock(); $mockedProducer = m::mock(Producer::class) - ->shouldReceive('newTopic') - ->andReturn($mockedProducerTopic) + ->shouldReceive('newTopic')->with('test_1')->once()->andReturn($mockedProducerTopic) + ->shouldReceive('newTopic')->with('test_2')->once()->andReturn($mockedProducerTopic) + ->shouldReceive('newTopic')->with('test_3')->once()->andReturn($mockedProducerTopic) ->shouldReceive('poll') ->times($messageBatch->getMessages()->count()) ->shouldReceive('flush') @@ -314,7 +317,7 @@ public function testSendMessageBatch(): void Event::fake(); - Kafka::publish()->withBodyKey('foo', 'bar')->onTopic('test')->sendBatch($messageBatch); + Kafka::publish()->withBodyKey('foo', 'bar')->sendBatch($messageBatch); Event::assertDispatched(PublishingMessageBatch::class, function (PublishingMessageBatch $event) use ($messageBatch) { return $event->batch === $messageBatch; @@ -329,6 +332,20 @@ public function testSendMessageBatch(): void }); } + #[Test] + public function it_throws_an_exception_if_there_is_a_message_in_batch_with_no_topic_specified(): void + { + $messageBatch = new MessageBatch; + $messageBatch->push(new Message('test_1')); + $messageBatch->push(new Message('test_2')); + $messageBatch->push(new Message); + + $this->expectException(CouldNotPublishMessageBatch::class); + $this->expectExceptionMessage("The provided topic name [''] is invalid for the message batch. Try again with a valid topic name."); + + Kafka::publish()->sendBatch($messageBatch); + } + public function testMacro(): void { $sasl = new Sasl(username: 'username', password: 'password', mechanisms: 'mechanisms');