From 3accaf9b83285fb41ec08c554aee004a636bd36e Mon Sep 17 00:00:00 2001 From: Igor Magollo Date: Wed, 31 Jan 2024 09:44:19 -0300 Subject: [PATCH 1/3] feat: add unhealth warn to rabbitmq publisher --- internal/events/rabbitmq/publisher/close.go | 1 + internal/events/rabbitmq/publisher/health.go | 80 +++++++++++++++++++ .../rabbitmq/publisher/notifications.go | 9 +-- internal/events/rabbitmq/publisher/pause.go | 35 ++++++++ internal/events/rabbitmq/publisher/publish.go | 12 +-- .../events/rabbitmq/publisher/publisher.go | 25 +++++- .../rabbitmq/publisher/startPublisher.go | 5 +- 7 files changed, 147 insertions(+), 20 deletions(-) create mode 100644 internal/events/rabbitmq/publisher/health.go create mode 100644 internal/events/rabbitmq/publisher/pause.go diff --git a/internal/events/rabbitmq/publisher/close.go b/internal/events/rabbitmq/publisher/close.go index 1060926..9e06cce 100644 --- a/internal/events/rabbitmq/publisher/close.go +++ b/internal/events/rabbitmq/publisher/close.go @@ -23,6 +23,7 @@ func (r *rabbitmqPublisher) Close(ctx context.Context) error { } r.logger.Info().Msg("publisher closed gracefully") + r.closed.Store(true) return nil } diff --git a/internal/events/rabbitmq/publisher/health.go b/internal/events/rabbitmq/publisher/health.go new file mode 100644 index 0000000..289e052 --- /dev/null +++ b/internal/events/rabbitmq/publisher/health.go @@ -0,0 +1,80 @@ +package publisher + +import ( + "sync" + "sync/atomic" + "time" +) + +type waitGroupCount struct { + sync.WaitGroup + count int64 +} + +func (wg *waitGroupCount) Add(delta int) { + atomic.AddInt64(&wg.count, int64(delta)) + wg.WaitGroup.Add(delta) +} + +func (wg *waitGroupCount) Done() { + atomic.AddInt64(&wg.count, -1) + wg.WaitGroup.Done() +} + +func (wg *waitGroupCount) GetCount() int64 { + return atomic.LoadInt64(&wg.count) +} + +func newWaitGroupCounter() *waitGroupCount { + return &waitGroupCount{ + WaitGroup: sync.WaitGroup{}, + count: 0, + } +} + +const ( + timeWithoutPublishUnhealth = 30 * time.Second + timePausedUnhealth = 30 * time.Second +) + +func (r *rabbitmqPublisher) healthCheckLoop() { + logger := r.logger.With().Str("component", "publisher_health_check").Logger() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + <-ticker.C + if r.closed.Load() { + return + } + + count := r.wg.GetCount() + logger.Debug().Int64("messages_unpublished", count).Msg("checking publisher health") + + if r.isPaused() { + logger.Debug().Time("last_paused", r.getLastPausedAt()).Msg("publisher is paused") + if time.Since(r.getLastPausedAt()) > timePausedUnhealth { + logger.Warn(). + Int64("messages_unpublished", count). + Time("last_paused", r.getLastPausedAt()). + Msg("publisher unhealthy: paused for too long") + } + continue + } + + if count == 0 { + logger.Debug().Msg("no messages pending, skipping") + continue + } + + r.lastPublishedAtMux.RLock() + lastPublishedAt := r.lastPublishedAt + r.lastPublishedAtMux.RUnlock() + if time.Since(lastPublishedAt) > timeWithoutPublishUnhealth { + logger.Warn(). + Int64("messages_unpublished", count). + Time("last_published", lastPublishedAt). + Msg("publisher unhealthy: no publishing for too long") + } + } +} diff --git a/internal/events/rabbitmq/publisher/notifications.go b/internal/events/rabbitmq/publisher/notifications.go index dc846ac..03844b2 100644 --- a/internal/events/rabbitmq/publisher/notifications.go +++ b/internal/events/rabbitmq/publisher/notifications.go @@ -32,17 +32,12 @@ func (r *rabbitmqPublisher) handleNotifyBlocked() { ) for blocking := range notifyBlockedChan { - r.pausePublishMux.Lock() if blocking.Active { - r.pausePublish = true - if len(r.pauseSignalChan) == 0 { - r.pauseSignalChan <- true - } + r.pause() r.logger.Warn().Msg("pausing publishing due to TCP blocking from server") } else { - r.pausePublish = false + r.resume() r.logger.Warn().Msg("resuming publishing due to TCP unblocking from server") } - r.pausePublishMux.Unlock() } } diff --git a/internal/events/rabbitmq/publisher/pause.go b/internal/events/rabbitmq/publisher/pause.go new file mode 100644 index 0000000..7df8f69 --- /dev/null +++ b/internal/events/rabbitmq/publisher/pause.go @@ -0,0 +1,35 @@ +package publisher + +import "time" + +func (r *rabbitmqPublisher) pause() { + r.pausePublishMux.Lock() + r.pausePublish = true + r.pausePublishMux.Unlock() + + r.lastPausedAtMux.Lock() + r.lastPausedAt = time.Now() + r.lastPausedAtMux.Unlock() + + if len(r.pauseSignalChan) == 0 { + r.pauseSignalChan <- true + } +} + +func (r *rabbitmqPublisher) resume() { + r.pausePublishMux.Lock() + r.pausePublish = false + r.pausePublishMux.Unlock() +} + +func (r *rabbitmqPublisher) isPaused() bool { + r.pausePublishMux.RLock() + defer r.pausePublishMux.RUnlock() + return r.pausePublish +} + +func (r *rabbitmqPublisher) getLastPausedAt() time.Time { + r.lastPausedAtMux.RLock() + defer r.lastPausedAtMux.RUnlock() + return r.lastPausedAt +} diff --git a/internal/events/rabbitmq/publisher/publish.go b/internal/events/rabbitmq/publisher/publish.go index 5ae85a5..3815531 100644 --- a/internal/events/rabbitmq/publisher/publish.go +++ b/internal/events/rabbitmq/publisher/publish.go @@ -100,14 +100,7 @@ func (r *rabbitmqPublisher) publishMessage(msg message) { // If we failed to publish, it means that the connection is down. // So we can pause the publisher and re-publish the event. // The publisher will be unpaused when the connection is re-established. - r.pausePublishMux.Lock() - r.pausePublish = true - r.pausePublishMux.Unlock() - - // If the channel is empty, we can send a signal to pause the publisher - if len(r.pauseSignalChan) == 0 { - r.pauseSignalChan <- true - } + r.pause() // Re-publish the event r.unpublishedMessages <- msg @@ -138,4 +131,7 @@ func (r *rabbitmqPublisher) publishMessage(msg message) { log.Ctx(msg.Context).Info().Str("topic", msg.Topic).Msg("message published") r.wg.Done() + r.lastPublishedAtMux.Lock() + r.lastPublishedAt = time.Now() + r.lastPublishedAtMux.Unlock() } diff --git a/internal/events/rabbitmq/publisher/publisher.go b/internal/events/rabbitmq/publisher/publisher.go index cfd6fa1..2d6189f 100644 --- a/internal/events/rabbitmq/publisher/publisher.go +++ b/internal/events/rabbitmq/publisher/publisher.go @@ -2,6 +2,8 @@ package publisher import ( "sync" + "sync/atomic" + "time" "github.com/gothunder/thunder/internal/events/rabbitmq" "github.com/gothunder/thunder/internal/events/rabbitmq/manager" @@ -28,7 +30,7 @@ type rabbitmqPublisher struct { publisherFunc func(message) // Wait group used to wait for all the publishes to finish - wg *sync.WaitGroup + wg *waitGroupCount // These flags are used to prevent the publisher from publishing messages to the queue pausePublish bool @@ -41,6 +43,16 @@ type rabbitmqPublisher struct { // tracing tracePropagator *tracing.AmqpTracePropagator + + // health check + health bool + healthMux *sync.RWMutex + lastPausedAt time.Time + lastPausedAtMux *sync.RWMutex + lastPublishedAt time.Time + lastPublishedAtMux *sync.RWMutex + + closed atomic.Bool } func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublisher, error) { @@ -58,7 +70,7 @@ func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublis chManager: chManager, unpublishedMessages: make(chan message), - wg: &sync.WaitGroup{}, + wg: newWaitGroupCounter(), pausePublish: true, pausePublishMux: &sync.RWMutex{}, @@ -73,6 +85,15 @@ func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublis notifyPublishChan: make(chan amqp.Confirmation), tracePropagator: tracing.NewAmqpTracing(log), + + health: true, + healthMux: &sync.RWMutex{}, + lastPausedAt: time.Now(), + lastPausedAtMux: &sync.RWMutex{}, + lastPublishedAt: time.Now(), + lastPublishedAtMux: &sync.RWMutex{}, + + closed: atomic.Bool{}, } publisher.publisherFunc = publisher.publishMessage diff --git a/internal/events/rabbitmq/publisher/startPublisher.go b/internal/events/rabbitmq/publisher/startPublisher.go index fc2e935..0829611 100644 --- a/internal/events/rabbitmq/publisher/startPublisher.go +++ b/internal/events/rabbitmq/publisher/startPublisher.go @@ -8,6 +8,7 @@ import ( func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error { go r.proccessingLoop() + go r.healthCheckLoop() for { err := r.chManager.Channel.Confirm(false) @@ -16,9 +17,7 @@ func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error { } r.listenForNotifications() - r.pausePublishMux.Lock() - r.pausePublish = false - r.pausePublishMux.Unlock() + r.resume() // Wait for reconnection err = <-r.chManager.NotifyReconnection From 6ab7191b70de3a6b6f540fac3b87f5d5a21d0c39 Mon Sep 17 00:00:00 2001 From: Igor Magollo Date: Wed, 31 Jan 2024 10:12:21 -0300 Subject: [PATCH 2/3] fix: change '-' for '_' in log hooks to properly correlation --- internal/log/correlationIDHook.go | 2 +- internal/log/tracingHook.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/log/correlationIDHook.go b/internal/log/correlationIDHook.go index 4cfb9d5..97c9a8d 100644 --- a/internal/log/correlationIDHook.go +++ b/internal/log/correlationIDHook.go @@ -14,6 +14,6 @@ func (h CorrelationIDHook) Run(e *zerolog.Event, level zerolog.Level, msg string ctx := e.GetCtx() correlationID := thunderContext.CorrelationIDFromContext(ctx) if correlationID != "" { - e.Str("correlation-id", correlationID) + e.Str("correlation_id", correlationID) } } diff --git a/internal/log/tracingHook.go b/internal/log/tracingHook.go index b1c740b..fe88561 100644 --- a/internal/log/tracingHook.go +++ b/internal/log/tracingHook.go @@ -18,5 +18,5 @@ func (h TracingHook) Run(e *zerolog.Event, level zerolog.Level, msg string) { spanID := spanContext.SpanID().String() traceID := spanContext.TraceID().String() - e.Str("trace-id", traceID).Str("span-id", spanID) + e.Str("trace_id", traceID).Str("span_id", spanID) } From 6f428a23245a794d39457e0f5c1ab7e786a3d66c Mon Sep 17 00:00:00 2001 From: Igor Magollo Date: Wed, 31 Jan 2024 11:15:50 -0300 Subject: [PATCH 3/3] fix: resolve conversations --- internal/events/rabbitmq/publisher/health.go | 7 +++---- internal/events/rabbitmq/publisher/publish.go | 10 ++++++++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/internal/events/rabbitmq/publisher/health.go b/internal/events/rabbitmq/publisher/health.go index 289e052..55fee51 100644 --- a/internal/events/rabbitmq/publisher/health.go +++ b/internal/events/rabbitmq/publisher/health.go @@ -33,6 +33,7 @@ func newWaitGroupCounter() *waitGroupCount { } const ( + timeCheckTicker = 30 * time.Second timeWithoutPublishUnhealth = 30 * time.Second timePausedUnhealth = 30 * time.Second ) @@ -40,7 +41,7 @@ const ( func (r *rabbitmqPublisher) healthCheckLoop() { logger := r.logger.With().Str("component", "publisher_health_check").Logger() - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(timeCheckTicker) defer ticker.Stop() for { <-ticker.C @@ -67,9 +68,7 @@ func (r *rabbitmqPublisher) healthCheckLoop() { continue } - r.lastPublishedAtMux.RLock() - lastPublishedAt := r.lastPublishedAt - r.lastPublishedAtMux.RUnlock() + lastPublishedAt := r.getLastPublishedAt() if time.Since(lastPublishedAt) > timeWithoutPublishUnhealth { logger.Warn(). Int64("messages_unpublished", count). diff --git a/internal/events/rabbitmq/publisher/publish.go b/internal/events/rabbitmq/publisher/publish.go index 3815531..6dd1eb5 100644 --- a/internal/events/rabbitmq/publisher/publish.go +++ b/internal/events/rabbitmq/publisher/publish.go @@ -131,7 +131,17 @@ func (r *rabbitmqPublisher) publishMessage(msg message) { log.Ctx(msg.Context).Info().Str("topic", msg.Topic).Msg("message published") r.wg.Done() + r.updatePublishedAt() +} + +func (r *rabbitmqPublisher) updatePublishedAt() { r.lastPublishedAtMux.Lock() r.lastPublishedAt = time.Now() r.lastPublishedAtMux.Unlock() } + +func (r *rabbitmqPublisher) getLastPublishedAt() time.Time { + r.lastPublishedAtMux.RLock() + defer r.lastPublishedAtMux.RUnlock() + return r.lastPublishedAt +}