Skip to content

Commit

Permalink
feat(inputs.amqp_consumer) : enable queue_arguments configuration and…
Browse files Browse the repository at this point in the history
… forward it to QueueDeclare*()
  • Loading branch information
Robin Lucbernet committed Nov 5, 2024
1 parent 115df09 commit 6aaf3b6
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type AMQPConsumer struct {
Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`
QueuePassive bool `toml:"queue_passive"`
QueueArguments map[string]int `toml:"queue_arguments"`
QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"`
BindingKey string `toml:"binding_key"`
PrefetchCount int `toml:"prefetch_count"`
Expand Down Expand Up @@ -369,14 +370,19 @@ func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error)
queueDurable = false
}

queueArgs := make(amqp.Table, len(a.QueueArguments))
for k, v := range a.QueueArguments {
queueArgs[k] = v
}

if a.QueuePassive {
queue, err = channel.QueueDeclarePassive(
a.Queue, // queue
queueDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
queueArgs, // arguments
)
} else {
queue, err = channel.QueueDeclare(
Expand All @@ -385,7 +391,7 @@ func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error)
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
queueArgs, // arguments
)
}
if err != nil {
Expand Down

0 comments on commit 6aaf3b6

Please sign in to comment.