diff --git a/shared/queue/queue.go b/shared/queue/queue.go index c762dd6..7a60031 100644 --- a/shared/queue/queue.go +++ b/shared/queue/queue.go @@ -9,6 +9,8 @@ import ( log "github.com/sirupsen/logrus" ) +const maxPriority uint8 = 5 + type Queue struct { channel *amqp.Channel queue amqp.Queue @@ -38,13 +40,17 @@ func Init(queueName, url string) (*Queue, error) { return nil, err } + args := amqp.Table{ + "x-max-priority": maxPriority, // Set the max priority level + } + q.queue, err = q.channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait - nil, // arguments + args, // arguments ) if err != nil { log.WithError(err).Error("error declaring RabbitMQ queue") @@ -57,6 +63,11 @@ func Init(queueName, url string) (*Queue, error) { func (q *Queue) Enqueue(ctx context.Context, msg payload.QueuePayload, prio uint8) error { log.Debugf("Enqueue function called with ctx %+v message: %v", ctx, msg) + if prio > maxPriority { + log.Warnf("Priority %d is higher than max priority %d. Setting to max priority", prio, maxPriority) + prio = maxPriority + } + body, err := json.Marshal(msg) if err != nil { log.WithError(err).Error("error marshalling message") @@ -69,7 +80,7 @@ func (q *Queue) Enqueue(ctx context.Context, msg payload.QueuePayload, prio uint false, // mandatory false, // immediate amqp.Publishing{ - ContentType: "text/plain", + ContentType: "application/json", Body: []byte(body), Priority: prio, })