diff --git a/src/Console/Commands/KafkaConsumer/Options.php b/src/Console/Commands/KafkaConsumer/Options.php index 805054fb..a0634fd6 100644 --- a/src/Console/Commands/KafkaConsumer/Options.php +++ b/src/Console/Commands/KafkaConsumer/Options.php @@ -97,6 +97,10 @@ public function getBroker(): string return $this->config['broker_connections'][$this->brokerConnection]; } + if (!isset($this->config['default'])) { + return $this->consumer['brokers']; + } + return $this->config['broker_connections'][$this->config['default']]; } } diff --git a/src/Kafka.php b/src/Kafka.php index 2a454d2b..ba35ddf9 100644 --- a/src/Kafka.php +++ b/src/Kafka.php @@ -19,9 +19,11 @@ class Kafka implements CanPublishMessagesToKafka, CanConsumeMessagesFromKafka */ public function publishOn(string $topic, string $broker = null): CanProduceMessages { + $defaultBrokers = config('kafka.broker_connections.' . config('kafka.default')) ?? config('kafka.brokers'); + return new ProducerBuilder( topic: $topic, - broker: $broker ?? config('kafka.broker_connections.' . config('kafka.default')) + broker: $broker ?? $defaultBrokers ); } @@ -35,8 +37,10 @@ public function publishOn(string $topic, string $broker = null): CanProduceMessa */ public function createConsumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder { + $defaultBrokers = config('kafka.broker_connections.' . config('kafka.default')) ?? config('kafka.brokers'); + return ConsumerBuilder::create( - brokers: $brokers ?? config('kafka.broker_connections.' . config('kafka.default')), + brokers: $brokers ?? $defaultBrokers, topics: $topics, groupId: $groupId ?? config('kafka.consumer_group_id') ); diff --git a/src/Producers/ProducerBuilder.php b/src/Producers/ProducerBuilder.php index 0beb67d6..6bb7aa4d 100644 --- a/src/Producers/ProducerBuilder.php +++ b/src/Producers/ProducerBuilder.php @@ -27,8 +27,10 @@ public function __construct( $message = app(KafkaProducerMessage::class); $this->message = $message->create($topic); $this->serializer = app(MessageSerializer::class); + $defaultConnection = config('kafka.default'); - $this->broker = $broker ?? config('kafka.broker_connections.' . $defaultConnection); + $defaultBrokers = config('kafka.broker_connections.' . $defaultConnection) ?? config('kafka.brokers'); + $this->broker = $broker ?? $defaultBrokers; } /**