Skip to content

Commit

Permalink
reverts changes
Browse files Browse the repository at this point in the history
  • Loading branch information
TheRafaBonin committed Sep 25, 2024
1 parent b82d103 commit 24a487a
Showing 1 changed file with 17 additions and 106 deletions.
123 changes: 17 additions & 106 deletions internal/events/rabbitmq/consumer/declares.go
Original file line number Diff line number Diff line change
@@ -1,79 +1,28 @@
package consumer

import (
"math"
"strings"
"time"

"github.com/rabbitmq/amqp091-go"
"github.com/rotisserie/eris"
)

func (r *rabbitmqConsumer) declare(routingKeys []string) error {
err := r.declareAttempt(routingKeys, true)
if err == nil {
// If there is no error, then the declaration was successful
r.logger.Info().Msg("successfully declared queues with DLX")
return nil
}
r.chManager.ChannelMux.RLock()
defer r.chManager.ChannelMux.RUnlock()

err = r.closeChannelAndWaitForReconnect()
dlxName := r.config.QueueName + "_dlx"
err := r.deadLetterDeclare(dlxName)
if err != nil {
return eris.Wrap(err, "closeChannelAndWaitForReconnect")
return eris.Wrap(err, "failed to declare dead letter")
}

err = r.declareAttempt(routingKeys, false)
err = r.queueDeclare(dlxName)
if err != nil {
return eris.Wrap(err, "declareAttempt - second attempt")
}

r.logger.Info().Msg("successfully declared queues without DLX")
return nil
}

func (r *rabbitmqConsumer) closeChannelAndWaitForReconnect() error {
// Wait for the channel to close - this is a workaround for the channel not closing immediately
// Check if the channel is open with backoff

r.chManager.Channel.Close()
time.Sleep(2 * time.Second)

const maxAttempts = 5
const backoffFactor = 2
for i := 1; i <= maxAttempts; i++ {
if r.chManager.Channel.IsClosed() {
x := math.Pow(backoffFactor, float64(i))
time.Sleep(time.Duration(x) * time.Second)

i++
}
}
if r.chManager.Channel.IsClosed() {
return eris.New("failed to reconnect to the amqp channel in time for subscriber")
}

return nil
}

func (r *rabbitmqConsumer) declareAttempt(routingKeys []string, useDLX bool) error {
r.chManager.ChannelMux.RLock()
defer r.chManager.ChannelMux.RUnlock()

if useDLX {
err := r.declareQueuesWithDLX()
if err != nil {
return eris.Wrap(err, "declareQueuesWithDLX")
}
} else {
err := r.declareQueuesWithoutDLX()
if err != nil {
return eris.Wrap(err, "declareQueuesWithoutDLX")
}
return eris.Wrap(err, "failed to declare queue")
}

err := r.queueBindDeclare(routingKeys)
err = r.queueBindDeclare(routingKeys)
if err != nil {
return eris.Wrap(err, "queueBindDeclare")
return eris.Wrap(err, "failed to declare queue bind")
}

err = r.chManager.Channel.Qos(
Expand All @@ -86,31 +35,7 @@ func (r *rabbitmqConsumer) declareAttempt(routingKeys []string, useDLX bool) err
return nil
}

func (r *rabbitmqConsumer) declareQueuesWithDLX() error {
dlxName := r.config.QueueName + "_dlx"
err := r.deadLetterDeclare(dlxName)
if err != nil {
return eris.Wrap(err, "deadLetterDeclare")
}

err = r.queueDeclare(&dlxName)
if err != nil {
return eris.Wrap(err, "queueDeclare")
}

return nil
}

func (r *rabbitmqConsumer) declareQueuesWithoutDLX() error {
err := r.queueDeclare(nil)
if err != nil {
return eris.Wrap(err, "queueDeclare")
}

return nil
}

func (r *rabbitmqConsumer) queueDeclare(dlxName *string) error {
func (r *rabbitmqConsumer) queueDeclare(dlxName string) error {
err := r.chManager.Channel.ExchangeDeclare(
r.config.ExchangeName,
"topic",
Expand All @@ -124,32 +49,18 @@ func (r *rabbitmqConsumer) queueDeclare(dlxName *string) error {
return eris.Wrap(err, "failed to declare exchange")
}

args := amqp091.Table{
amqp091.QueueTypeArg: amqp091.QueueTypeQuorum,
}
if dlxName != nil && *dlxName != "" {
args["x-dead-letter-exchange"] = dlxName
}

_, err = r.chManager.Channel.QueueDeclare(
r.config.QueueName,
true,
false,
false,
false,
args,
amqp091.Table{
"x-queue-type": "quorum",
"x-dead-letter-exchange": dlxName,
},
)
if err != nil {
if strings.Contains(err.Error(), "PRECONDITION_FAILED") {
// Queue already exists - no need to redeclare
// RabbitMQ will force close, we want to wait for the channel to open again
err = r.closeChannelAndWaitForReconnect()
if err != nil {
return eris.Wrap(err, "closeChannelAndWaitForReconnect")
}

return nil
}
return eris.Wrap(err, "failed to declare queue")
}

Expand Down Expand Up @@ -184,7 +95,7 @@ func (r *rabbitmqConsumer) deadLetterDeclare(dlxName string) error {
nil,
)
if err != nil {
return eris.Wrap(err, "failed to declare DLX exchange")
return eris.Wrap(err, "failed to declare exchange")
}

_, err = r.chManager.Channel.QueueDeclare(
Expand All @@ -199,7 +110,7 @@ func (r *rabbitmqConsumer) deadLetterDeclare(dlxName string) error {
},
)
if err != nil {
return eris.Wrap(err, "failed to declare DLX queue")
return eris.Wrap(err, "failed to declare queue")
}

err = r.chManager.Channel.QueueBind(
Expand All @@ -210,7 +121,7 @@ func (r *rabbitmqConsumer) deadLetterDeclare(dlxName string) error {
nil,
)
if err != nil {
return eris.Wrap(err, "failed to bind DLX queue")
return eris.Wrap(err, "failed to bind queue")
}

return nil
Expand Down

0 comments on commit 24a487a

Please sign in to comment.