diff --git a/internal/events/rabbitmq/consumer/declares.go b/internal/events/rabbitmq/consumer/declares.go index 840316e..8311afd 100644 --- a/internal/events/rabbitmq/consumer/declares.go +++ b/internal/events/rabbitmq/consumer/declares.go @@ -1,79 +1,28 @@ package consumer import ( - "math" - "strings" - "time" - "github.com/rabbitmq/amqp091-go" "github.com/rotisserie/eris" ) func (r *rabbitmqConsumer) declare(routingKeys []string) error { - err := r.declareAttempt(routingKeys, true) - if err == nil { - // If there is no error, then the declaration was successful - r.logger.Info().Msg("successfully declared queues with DLX") - return nil - } + r.chManager.ChannelMux.RLock() + defer r.chManager.ChannelMux.RUnlock() - err = r.closeChannelAndWaitForReconnect() + dlxName := r.config.QueueName + "_dlx" + err := r.deadLetterDeclare(dlxName) if err != nil { - return eris.Wrap(err, "closeChannelAndWaitForReconnect") + return eris.Wrap(err, "failed to declare dead letter") } - err = r.declareAttempt(routingKeys, false) + err = r.queueDeclare(dlxName) if err != nil { - return eris.Wrap(err, "declareAttempt - second attempt") - } - - r.logger.Info().Msg("successfully declared queues without DLX") - return nil -} - -func (r *rabbitmqConsumer) closeChannelAndWaitForReconnect() error { - // Wait for the channel to close - this is a workaround for the channel not closing immediately - // Check if the channel is open with backoff - - r.chManager.Channel.Close() - time.Sleep(2 * time.Second) - - const maxAttempts = 5 - const backoffFactor = 2 - for i := 1; i <= maxAttempts; i++ { - if r.chManager.Channel.IsClosed() { - x := math.Pow(backoffFactor, float64(i)) - time.Sleep(time.Duration(x) * time.Second) - - i++ - } - } - if r.chManager.Channel.IsClosed() { - return eris.New("failed to reconnect to the amqp channel in time for subscriber") - } - - return nil -} - -func (r *rabbitmqConsumer) declareAttempt(routingKeys []string, useDLX bool) error { - r.chManager.ChannelMux.RLock() - defer r.chManager.ChannelMux.RUnlock() - - if useDLX { - err := r.declareQueuesWithDLX() - if err != nil { - return eris.Wrap(err, "declareQueuesWithDLX") - } - } else { - err := r.declareQueuesWithoutDLX() - if err != nil { - return eris.Wrap(err, "declareQueuesWithoutDLX") - } + return eris.Wrap(err, "failed to declare queue") } - err := r.queueBindDeclare(routingKeys) + err = r.queueBindDeclare(routingKeys) if err != nil { - return eris.Wrap(err, "queueBindDeclare") + return eris.Wrap(err, "failed to declare queue bind") } err = r.chManager.Channel.Qos( @@ -86,31 +35,7 @@ func (r *rabbitmqConsumer) declareAttempt(routingKeys []string, useDLX bool) err return nil } -func (r *rabbitmqConsumer) declareQueuesWithDLX() error { - dlxName := r.config.QueueName + "_dlx" - err := r.deadLetterDeclare(dlxName) - if err != nil { - return eris.Wrap(err, "deadLetterDeclare") - } - - err = r.queueDeclare(&dlxName) - if err != nil { - return eris.Wrap(err, "queueDeclare") - } - - return nil -} - -func (r *rabbitmqConsumer) declareQueuesWithoutDLX() error { - err := r.queueDeclare(nil) - if err != nil { - return eris.Wrap(err, "queueDeclare") - } - - return nil -} - -func (r *rabbitmqConsumer) queueDeclare(dlxName *string) error { +func (r *rabbitmqConsumer) queueDeclare(dlxName string) error { err := r.chManager.Channel.ExchangeDeclare( r.config.ExchangeName, "topic", @@ -124,32 +49,18 @@ func (r *rabbitmqConsumer) queueDeclare(dlxName *string) error { return eris.Wrap(err, "failed to declare exchange") } - args := amqp091.Table{ - amqp091.QueueTypeArg: amqp091.QueueTypeQuorum, - } - if dlxName != nil && *dlxName != "" { - args["x-dead-letter-exchange"] = dlxName - } - _, err = r.chManager.Channel.QueueDeclare( r.config.QueueName, true, false, false, false, - args, + amqp091.Table{ + "x-queue-type": "quorum", + "x-dead-letter-exchange": dlxName, + }, ) if err != nil { - if strings.Contains(err.Error(), "PRECONDITION_FAILED") { - // Queue already exists - no need to redeclare - // RabbitMQ will force close, we want to wait for the channel to open again - err = r.closeChannelAndWaitForReconnect() - if err != nil { - return eris.Wrap(err, "closeChannelAndWaitForReconnect") - } - - return nil - } return eris.Wrap(err, "failed to declare queue") } @@ -184,7 +95,7 @@ func (r *rabbitmqConsumer) deadLetterDeclare(dlxName string) error { nil, ) if err != nil { - return eris.Wrap(err, "failed to declare DLX exchange") + return eris.Wrap(err, "failed to declare exchange") } _, err = r.chManager.Channel.QueueDeclare( @@ -199,7 +110,7 @@ func (r *rabbitmqConsumer) deadLetterDeclare(dlxName string) error { }, ) if err != nil { - return eris.Wrap(err, "failed to declare DLX queue") + return eris.Wrap(err, "failed to declare queue") } err = r.chManager.Channel.QueueBind( @@ -210,7 +121,7 @@ func (r *rabbitmqConsumer) deadLetterDeclare(dlxName string) error { nil, ) if err != nil { - return eris.Wrap(err, "failed to bind DLX queue") + return eris.Wrap(err, "failed to bind queue") } return nil