Skip to content

Commit

Permalink
Merge pull request #93 from gothunder/feature/add-named-handlers
Browse files Browse the repository at this point in the history
Adds v0 of namedHandelers. This can be improved in the future
  • Loading branch information
TheRafaBonin authored Jul 25, 2024
2 parents 6891aaa + 0c30acf commit 9cf93a8
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 7 deletions.
12 changes: 7 additions & 5 deletions pkg/events/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ type Handler interface {
/*
MatchTopicAndFormatsMessage matches the topic and formats the message.
**This is deprecated and should not be used.**
It takes the following parameters:
- ctx: the context.Context object for the function.
- decoder: the thunderEvents.EventDecoder object for decoding the message.
- referenceTopic: the reference topic string for matching.
- topic: the topic string to match against the reference topic.
- message: the message object to be formatted.
- ctx: the context.Context object for the function.
- decoder: the thunderEvents.EventDecoder object for decoding the message.
- referenceTopic: the reference topic string for matching.
- topic: the topic string to match against the reference topic.
- message: the message object to be formatted.
It returns a pointer to the formatted message object and any error encountered.
if the match fails, nil is returned.
Expand Down
35 changes: 35 additions & 0 deletions pkg/events/namedHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package events

import "go.uber.org/fx"

// NamedHandler is a handler that has a queue name associated with it.
type NamedHandler interface {
QueuePosfix() string
Handler
}

type namedHandler struct {
Handler
queuePosfix string
}

func (n *namedHandler) QueuePosfix() string {
return n.queuePosfix
}

// NewNamedHandlerFromHandler creates a new NamedHandler from a Handler.
func NewNamedHandlerFromHandler(handler Handler, queuePosfix string) NamedHandler {
return &namedHandler{
Handler: handler,
queuePosfix: queuePosfix,
}
}

// FxAnnotateNamedHandler provides a named handler with group tags.
// This is used to provide named handlers with the group tag `group:"named_handlers"`.
// It's not a type safe function, so it's up to the caller to provide the correct type.
func FxAnnotateNamedHandler(namedHandlerFunc interface{}) fx.Option {
return fx.Provide(
fx.Annotate(namedHandlerFunc, fx.As(new(NamedHandler)), fx.ResultTags(`group:"named_handlers"`)),
)
}
36 changes: 34 additions & 2 deletions pkg/events/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,28 @@ import (
"go.uber.org/fx"
)

func NewRabbitMQConsumer(logger *zerolog.Logger, opts ...rabbitmq.RabbitmqConfigOption) (events.EventConsumer, error) {
return consumer.NewConsumer(amqp091.Config{}, logger, opts...)
type namedHandlerParams struct {
fx.In
NamedHandlers []events.NamedHandler `group:"named_handlers"`
}

func registerNamedConsumers(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, params namedHandlerParams) {
for _, namedHandler := range params.NamedHandlers {
registerNamedConsumer(lc, s, logger, namedHandler)
}
}

func registerNamedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, namedHandler events.NamedHandler) {
consumer, err := NewRabbitMQConsumer(logger, WithQueueNamePosfix(namedHandler.QueuePosfix()))
if err != nil {
logger.Error().Err(err).Msg("failed to create consumer")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}

registerProvidedConsumer(lc, s, logger, namedHandler, consumer)
}

func registerConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, handler events.Handler) {
Expand Down Expand Up @@ -90,6 +110,10 @@ func registerProvidedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.
)
}

func NewRabbitMQConsumer(logger *zerolog.Logger, opts ...rabbitmq.RabbitmqConfigOption) (events.EventConsumer, error) {
return consumer.NewConsumer(amqp091.Config{}, logger, opts...)
}

// A module that provides a RabbitMQ consumer.
// The consumer will be automatically started and stopped gracefully.
// The consumer will subscribe to the provided topics.
Expand All @@ -103,3 +127,11 @@ var InvokeConsumer = fx.Invoke(
var InvokeProvidedConsumer = fx.Invoke(
registerProvidedConsumer,
)

var InvokeNamedConsumers = fx.Invoke(
registerNamedConsumers,
)

var InvokeNamedConsumer = fx.Invoke(
registerNamedConsumer,
)

0 comments on commit 9cf93a8

Please sign in to comment.