Skip to content

Commit

Permalink
Do not override topic when publishing batch messages (#313)
Browse files Browse the repository at this point in the history
* Do not override topic when publishing batch messages

* Update docs
  • Loading branch information
mateusjunges authored Aug 18, 2024
1 parent 5fa0f1b commit 2d2d20c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
10 changes: 8 additions & 2 deletions docs/producing-messages/6-producing-message-batch-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use Junges\Kafka\Message\Message;
$message = new Message(
headers: ['header-key' => 'header-value'],
body: ['key' => 'value'],
key: 'kafka key here'
key: 'kafka key here',
topicName: 'my_topic'
)

$messageBatch = new MessageBatch();
Expand All @@ -35,4 +36,9 @@ $producer = Kafka::publish('broker')
->withConfigOptions(['key' => 'value']);

$producer->sendBatch($messageBatch);
```
```

When producing batch messages, you can specify the topic for each message that you want to publish. If you want to publish all messages in the same topic,
you can use the `onTopic` method on the `MessageBatch` class to specify the topic once for all messages. Please note that
if a message within the batch specifies a topic, we will use that topic to publish the message, instead of the topic defined on the
batch itself.
38 changes: 32 additions & 6 deletions src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ public function produce(ProducerMessage $message): bool
/** @inheritDoc */
public function produceBatch(MessageBatch $messageBatch): int
{
if ($messageBatch->getTopicName() === '') {
throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName());
}

$topic = $this->producer->newTopic($messageBatch->getTopicName());
$this->assertTopicWasSetForAllBatchMessages($messageBatch);

$messagesIterator = $messageBatch->getMessages();

Expand All @@ -94,7 +90,13 @@ public function produceBatch(MessageBatch $messageBatch): int

foreach ($messagesIterator as $message) {
assert($message instanceof Message);
$message->onTopic($messageBatch->getTopicName());

if ($message->getTopicName() === null) {
$message->onTopic($messageBatch->getTopicName());
}

$topic = $this->producer->newTopic($message->getTopicName());

$message = $this->serializer->serialize($message);

$this->produceMessageBatch($topic, $message, $messageBatch->getBatchUuid());
Expand All @@ -111,6 +113,30 @@ public function produceBatch(MessageBatch $messageBatch): int
return $produced;
}

/** @throws CouldNotPublishMessageBatch */
private function assertTopicWasSetForAllBatchMessages(MessageBatch $batch): void
{
// If the message batch has a topic set, we can return here because
// we can use that topic as a fallback in case not all batch
// messages have a specific topic to be used.
if ($batch->getTopicName() !== '') {
return;
}

$messagesIterator = $batch->getMessages();

foreach ($messagesIterator as $message) {
assert($message instanceof Message);

// If the batch does not have a topic defined, we check if the message
// itself has specified a topic in which it should be published.
// If it does not, then we throw an exception.
if ($message->getTopicName() === '' || $message->getTopicName() === null) {
throw CouldNotPublishMessageBatch::invalidTopicName($message->getTopicName() ?? '');
}
}
}

private function produceMessage(ProducerTopic $topic, ProducerMessage $message): void
{
$topic->producev(
Expand Down
31 changes: 24 additions & 7 deletions tests/KafkaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
use Junges\Kafka\Events\MessagePublished;
use Junges\Kafka\Events\PublishingMessageBatch;
use Junges\Kafka\Exceptions\CouldNotPublishMessage;
use Junges\Kafka\Exceptions\CouldNotPublishMessageBatch;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
use Junges\Kafka\Message\Serializers\JsonSerializer;
use Junges\Kafka\Producers\Builder as ProducerBuilder;
use Junges\Kafka\Producers\MessageBatch;
use Mockery as m;
use PHPUnit\Framework\Attributes\Test;
use RdKafka\Producer;
use RdKafka\ProducerTopic;

Expand Down Expand Up @@ -285,10 +287,10 @@ public function testProducerThrowsExceptionIfMessageCouldNotBePublished(): void

public function testSendMessageBatch(): void
{
$messageBatch = (new MessageBatch())->onTopic('test');
$messageBatch->push(new Message('test'));
$messageBatch->push(new Message('test'));
$messageBatch->push(new Message('test'));
$messageBatch = new MessageBatch;
$messageBatch->push(new Message('test_1'));
$messageBatch->push(new Message('test_2'));
$messageBatch->push(new Message('test_3'));

$expectedUuid = $messageBatch->getBatchUuid();

Expand All @@ -299,8 +301,9 @@ public function testSendMessageBatch(): void
->getMock();

$mockedProducer = m::mock(Producer::class)
->shouldReceive('newTopic')
->andReturn($mockedProducerTopic)
->shouldReceive('newTopic')->with('test_1')->once()->andReturn($mockedProducerTopic)
->shouldReceive('newTopic')->with('test_2')->once()->andReturn($mockedProducerTopic)
->shouldReceive('newTopic')->with('test_3')->once()->andReturn($mockedProducerTopic)
->shouldReceive('poll')
->times($messageBatch->getMessages()->count())
->shouldReceive('flush')
Expand All @@ -314,7 +317,7 @@ public function testSendMessageBatch(): void

Event::fake();

Kafka::publish()->withBodyKey('foo', 'bar')->onTopic('test')->sendBatch($messageBatch);
Kafka::publish()->withBodyKey('foo', 'bar')->sendBatch($messageBatch);

Event::assertDispatched(PublishingMessageBatch::class, function (PublishingMessageBatch $event) use ($messageBatch) {
return $event->batch === $messageBatch;
Expand All @@ -329,6 +332,20 @@ public function testSendMessageBatch(): void
});
}

#[Test]
public function it_throws_an_exception_if_there_is_a_message_in_batch_with_no_topic_specified(): void
{
$messageBatch = new MessageBatch;
$messageBatch->push(new Message('test_1'));
$messageBatch->push(new Message('test_2'));
$messageBatch->push(new Message);

$this->expectException(CouldNotPublishMessageBatch::class);
$this->expectExceptionMessage("The provided topic name [''] is invalid for the message batch. Try again with a valid topic name.");

Kafka::publish()->sendBatch($messageBatch);
}

public function testMacro(): void
{
$sasl = new Sasl(username: 'username', password: 'password', mechanisms: 'mechanisms');
Expand Down

0 comments on commit 2d2d20c

Please sign in to comment.