Skip to content

Commit

Permalink
Merge pull request #82 from igmagollo/feat/publisher-health-check
Browse files Browse the repository at this point in the history
feat: add unhealth warn to rabbitmq publisher
  • Loading branch information
TheRafaBonin authored Jan 31, 2024
2 parents 8028c31 + 6f428a2 commit d39dc28
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 22 deletions.
1 change: 1 addition & 0 deletions internal/events/rabbitmq/publisher/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func (r *rabbitmqPublisher) Close(ctx context.Context) error {
}

r.logger.Info().Msg("publisher closed gracefully")
r.closed.Store(true)
return nil
}

Expand Down
79 changes: 79 additions & 0 deletions internal/events/rabbitmq/publisher/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package publisher

import (
"sync"
"sync/atomic"
"time"
)

type waitGroupCount struct {
sync.WaitGroup
count int64
}

func (wg *waitGroupCount) Add(delta int) {
atomic.AddInt64(&wg.count, int64(delta))
wg.WaitGroup.Add(delta)
}

func (wg *waitGroupCount) Done() {
atomic.AddInt64(&wg.count, -1)
wg.WaitGroup.Done()
}

func (wg *waitGroupCount) GetCount() int64 {
return atomic.LoadInt64(&wg.count)
}

func newWaitGroupCounter() *waitGroupCount {
return &waitGroupCount{
WaitGroup: sync.WaitGroup{},
count: 0,
}
}

const (
timeCheckTicker = 30 * time.Second
timeWithoutPublishUnhealth = 30 * time.Second
timePausedUnhealth = 30 * time.Second
)

func (r *rabbitmqPublisher) healthCheckLoop() {
logger := r.logger.With().Str("component", "publisher_health_check").Logger()

ticker := time.NewTicker(timeCheckTicker)
defer ticker.Stop()
for {
<-ticker.C
if r.closed.Load() {
return
}

count := r.wg.GetCount()
logger.Debug().Int64("messages_unpublished", count).Msg("checking publisher health")

if r.isPaused() {
logger.Debug().Time("last_paused", r.getLastPausedAt()).Msg("publisher is paused")
if time.Since(r.getLastPausedAt()) > timePausedUnhealth {
logger.Warn().
Int64("messages_unpublished", count).
Time("last_paused", r.getLastPausedAt()).
Msg("publisher unhealthy: paused for too long")
}
continue
}

if count == 0 {
logger.Debug().Msg("no messages pending, skipping")
continue
}

lastPublishedAt := r.getLastPublishedAt()
if time.Since(lastPublishedAt) > timeWithoutPublishUnhealth {
logger.Warn().
Int64("messages_unpublished", count).
Time("last_published", lastPublishedAt).
Msg("publisher unhealthy: no publishing for too long")
}
}
}
9 changes: 2 additions & 7 deletions internal/events/rabbitmq/publisher/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,12 @@ func (r *rabbitmqPublisher) handleNotifyBlocked() {
)

for blocking := range notifyBlockedChan {
r.pausePublishMux.Lock()
if blocking.Active {
r.pausePublish = true
if len(r.pauseSignalChan) == 0 {
r.pauseSignalChan <- true
}
r.pause()
r.logger.Warn().Msg("pausing publishing due to TCP blocking from server")
} else {
r.pausePublish = false
r.resume()
r.logger.Warn().Msg("resuming publishing due to TCP unblocking from server")
}
r.pausePublishMux.Unlock()
}
}
35 changes: 35 additions & 0 deletions internal/events/rabbitmq/publisher/pause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package publisher

import "time"

func (r *rabbitmqPublisher) pause() {
r.pausePublishMux.Lock()
r.pausePublish = true
r.pausePublishMux.Unlock()

r.lastPausedAtMux.Lock()
r.lastPausedAt = time.Now()
r.lastPausedAtMux.Unlock()

if len(r.pauseSignalChan) == 0 {
r.pauseSignalChan <- true
}
}

func (r *rabbitmqPublisher) resume() {
r.pausePublishMux.Lock()
r.pausePublish = false
r.pausePublishMux.Unlock()
}

func (r *rabbitmqPublisher) isPaused() bool {
r.pausePublishMux.RLock()
defer r.pausePublishMux.RUnlock()
return r.pausePublish
}

func (r *rabbitmqPublisher) getLastPausedAt() time.Time {
r.lastPausedAtMux.RLock()
defer r.lastPausedAtMux.RUnlock()
return r.lastPausedAt
}
22 changes: 14 additions & 8 deletions internal/events/rabbitmq/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,7 @@ func (r *rabbitmqPublisher) publishMessage(msg message) {
// If we failed to publish, it means that the connection is down.
// So we can pause the publisher and re-publish the event.
// The publisher will be unpaused when the connection is re-established.
r.pausePublishMux.Lock()
r.pausePublish = true
r.pausePublishMux.Unlock()

// If the channel is empty, we can send a signal to pause the publisher
if len(r.pauseSignalChan) == 0 {
r.pauseSignalChan <- true
}
r.pause()

// Re-publish the event
r.unpublishedMessages <- msg
Expand Down Expand Up @@ -138,4 +131,17 @@ func (r *rabbitmqPublisher) publishMessage(msg message) {

log.Ctx(msg.Context).Info().Str("topic", msg.Topic).Msg("message published")
r.wg.Done()
r.updatePublishedAt()
}

func (r *rabbitmqPublisher) updatePublishedAt() {
r.lastPublishedAtMux.Lock()
r.lastPublishedAt = time.Now()
r.lastPublishedAtMux.Unlock()
}

func (r *rabbitmqPublisher) getLastPublishedAt() time.Time {
r.lastPublishedAtMux.RLock()
defer r.lastPublishedAtMux.RUnlock()
return r.lastPublishedAt
}
25 changes: 23 additions & 2 deletions internal/events/rabbitmq/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package publisher

import (
"sync"
"sync/atomic"
"time"

"github.com/gothunder/thunder/internal/events/rabbitmq"
"github.com/gothunder/thunder/internal/events/rabbitmq/manager"
Expand All @@ -28,7 +30,7 @@ type rabbitmqPublisher struct {
publisherFunc func(message)

// Wait group used to wait for all the publishes to finish
wg *sync.WaitGroup
wg *waitGroupCount

// These flags are used to prevent the publisher from publishing messages to the queue
pausePublish bool
Expand All @@ -41,6 +43,16 @@ type rabbitmqPublisher struct {

// tracing
tracePropagator *tracing.AmqpTracePropagator

// health check
health bool
healthMux *sync.RWMutex
lastPausedAt time.Time
lastPausedAtMux *sync.RWMutex
lastPublishedAt time.Time
lastPublishedAtMux *sync.RWMutex

closed atomic.Bool
}

func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublisher, error) {
Expand All @@ -58,7 +70,7 @@ func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublis
chManager: chManager,

unpublishedMessages: make(chan message),
wg: &sync.WaitGroup{},
wg: newWaitGroupCounter(),

pausePublish: true,
pausePublishMux: &sync.RWMutex{},
Expand All @@ -73,6 +85,15 @@ func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublis
notifyPublishChan: make(chan amqp.Confirmation),

tracePropagator: tracing.NewAmqpTracing(log),

health: true,
healthMux: &sync.RWMutex{},
lastPausedAt: time.Now(),
lastPausedAtMux: &sync.RWMutex{},
lastPublishedAt: time.Now(),
lastPublishedAtMux: &sync.RWMutex{},

closed: atomic.Bool{},
}
publisher.publisherFunc = publisher.publishMessage

Expand Down
5 changes: 2 additions & 3 deletions internal/events/rabbitmq/publisher/startPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error {
go r.proccessingLoop()
go r.healthCheckLoop()

for {
err := r.chManager.Channel.Confirm(false)
Expand All @@ -16,9 +17,7 @@ func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error {
}
r.listenForNotifications()

r.pausePublishMux.Lock()
r.pausePublish = false
r.pausePublishMux.Unlock()
r.resume()

// Wait for reconnection
err = <-r.chManager.NotifyReconnection
Expand Down
2 changes: 1 addition & 1 deletion internal/log/correlationIDHook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ func (h CorrelationIDHook) Run(e *zerolog.Event, level zerolog.Level, msg string
ctx := e.GetCtx()
correlationID := thunderContext.CorrelationIDFromContext(ctx)
if correlationID != "" {
e.Str("correlation-id", correlationID)
e.Str("correlation_id", correlationID)
}
}
2 changes: 1 addition & 1 deletion internal/log/tracingHook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ func (h TracingHook) Run(e *zerolog.Event, level zerolog.Level, msg string) {
spanID := spanContext.SpanID().String()
traceID := spanContext.TraceID().String()

e.Str("trace-id", traceID).Str("span-id", spanID)
e.Str("trace_id", traceID).Str("span_id", spanID)
}

0 comments on commit d39dc28

Please sign in to comment.