diff --git a/internal/events/rabbitmq/publisher/close.go b/internal/events/rabbitmq/publisher/close.go index 1060926..9e06cce 100644 --- a/internal/events/rabbitmq/publisher/close.go +++ b/internal/events/rabbitmq/publisher/close.go @@ -23,6 +23,7 @@ func (r *rabbitmqPublisher) Close(ctx context.Context) error { } r.logger.Info().Msg("publisher closed gracefully") + r.closed.Store(true) return nil } diff --git a/internal/events/rabbitmq/publisher/health.go b/internal/events/rabbitmq/publisher/health.go new file mode 100644 index 0000000..55fee51 --- /dev/null +++ b/internal/events/rabbitmq/publisher/health.go @@ -0,0 +1,79 @@ +package publisher + +import ( + "sync" + "sync/atomic" + "time" +) + +type waitGroupCount struct { + sync.WaitGroup + count int64 +} + +func (wg *waitGroupCount) Add(delta int) { + atomic.AddInt64(&wg.count, int64(delta)) + wg.WaitGroup.Add(delta) +} + +func (wg *waitGroupCount) Done() { + atomic.AddInt64(&wg.count, -1) + wg.WaitGroup.Done() +} + +func (wg *waitGroupCount) GetCount() int64 { + return atomic.LoadInt64(&wg.count) +} + +func newWaitGroupCounter() *waitGroupCount { + return &waitGroupCount{ + WaitGroup: sync.WaitGroup{}, + count: 0, + } +} + +const ( + timeCheckTicker = 30 * time.Second + timeWithoutPublishUnhealth = 30 * time.Second + timePausedUnhealth = 30 * time.Second +) + +func (r *rabbitmqPublisher) healthCheckLoop() { + logger := r.logger.With().Str("component", "publisher_health_check").Logger() + + ticker := time.NewTicker(timeCheckTicker) + defer ticker.Stop() + for { + <-ticker.C + if r.closed.Load() { + return + } + + count := r.wg.GetCount() + logger.Debug().Int64("messages_unpublished", count).Msg("checking publisher health") + + if r.isPaused() { + logger.Debug().Time("last_paused", r.getLastPausedAt()).Msg("publisher is paused") + if time.Since(r.getLastPausedAt()) > timePausedUnhealth { + logger.Warn(). + Int64("messages_unpublished", count). + Time("last_paused", r.getLastPausedAt()). + Msg("publisher unhealthy: paused for too long") + } + continue + } + + if count == 0 { + logger.Debug().Msg("no messages pending, skipping") + continue + } + + lastPublishedAt := r.getLastPublishedAt() + if time.Since(lastPublishedAt) > timeWithoutPublishUnhealth { + logger.Warn(). + Int64("messages_unpublished", count). + Time("last_published", lastPublishedAt). + Msg("publisher unhealthy: no publishing for too long") + } + } +} diff --git a/internal/events/rabbitmq/publisher/notifications.go b/internal/events/rabbitmq/publisher/notifications.go index dc846ac..03844b2 100644 --- a/internal/events/rabbitmq/publisher/notifications.go +++ b/internal/events/rabbitmq/publisher/notifications.go @@ -32,17 +32,12 @@ func (r *rabbitmqPublisher) handleNotifyBlocked() { ) for blocking := range notifyBlockedChan { - r.pausePublishMux.Lock() if blocking.Active { - r.pausePublish = true - if len(r.pauseSignalChan) == 0 { - r.pauseSignalChan <- true - } + r.pause() r.logger.Warn().Msg("pausing publishing due to TCP blocking from server") } else { - r.pausePublish = false + r.resume() r.logger.Warn().Msg("resuming publishing due to TCP unblocking from server") } - r.pausePublishMux.Unlock() } } diff --git a/internal/events/rabbitmq/publisher/pause.go b/internal/events/rabbitmq/publisher/pause.go new file mode 100644 index 0000000..7df8f69 --- /dev/null +++ b/internal/events/rabbitmq/publisher/pause.go @@ -0,0 +1,35 @@ +package publisher + +import "time" + +func (r *rabbitmqPublisher) pause() { + r.pausePublishMux.Lock() + r.pausePublish = true + r.pausePublishMux.Unlock() + + r.lastPausedAtMux.Lock() + r.lastPausedAt = time.Now() + r.lastPausedAtMux.Unlock() + + if len(r.pauseSignalChan) == 0 { + r.pauseSignalChan <- true + } +} + +func (r *rabbitmqPublisher) resume() { + r.pausePublishMux.Lock() + r.pausePublish = false + r.pausePublishMux.Unlock() +} + +func (r *rabbitmqPublisher) isPaused() bool { + r.pausePublishMux.RLock() + defer r.pausePublishMux.RUnlock() + return r.pausePublish +} + +func (r *rabbitmqPublisher) getLastPausedAt() time.Time { + r.lastPausedAtMux.RLock() + defer r.lastPausedAtMux.RUnlock() + return r.lastPausedAt +} diff --git a/internal/events/rabbitmq/publisher/publish.go b/internal/events/rabbitmq/publisher/publish.go index 5ae85a5..6dd1eb5 100644 --- a/internal/events/rabbitmq/publisher/publish.go +++ b/internal/events/rabbitmq/publisher/publish.go @@ -100,14 +100,7 @@ func (r *rabbitmqPublisher) publishMessage(msg message) { // If we failed to publish, it means that the connection is down. // So we can pause the publisher and re-publish the event. // The publisher will be unpaused when the connection is re-established. - r.pausePublishMux.Lock() - r.pausePublish = true - r.pausePublishMux.Unlock() - - // If the channel is empty, we can send a signal to pause the publisher - if len(r.pauseSignalChan) == 0 { - r.pauseSignalChan <- true - } + r.pause() // Re-publish the event r.unpublishedMessages <- msg @@ -138,4 +131,17 @@ func (r *rabbitmqPublisher) publishMessage(msg message) { log.Ctx(msg.Context).Info().Str("topic", msg.Topic).Msg("message published") r.wg.Done() + r.updatePublishedAt() +} + +func (r *rabbitmqPublisher) updatePublishedAt() { + r.lastPublishedAtMux.Lock() + r.lastPublishedAt = time.Now() + r.lastPublishedAtMux.Unlock() +} + +func (r *rabbitmqPublisher) getLastPublishedAt() time.Time { + r.lastPublishedAtMux.RLock() + defer r.lastPublishedAtMux.RUnlock() + return r.lastPublishedAt } diff --git a/internal/events/rabbitmq/publisher/publisher.go b/internal/events/rabbitmq/publisher/publisher.go index cfd6fa1..2d6189f 100644 --- a/internal/events/rabbitmq/publisher/publisher.go +++ b/internal/events/rabbitmq/publisher/publisher.go @@ -2,6 +2,8 @@ package publisher import ( "sync" + "sync/atomic" + "time" "github.com/gothunder/thunder/internal/events/rabbitmq" "github.com/gothunder/thunder/internal/events/rabbitmq/manager" @@ -28,7 +30,7 @@ type rabbitmqPublisher struct { publisherFunc func(message) // Wait group used to wait for all the publishes to finish - wg *sync.WaitGroup + wg *waitGroupCount // These flags are used to prevent the publisher from publishing messages to the queue pausePublish bool @@ -41,6 +43,16 @@ type rabbitmqPublisher struct { // tracing tracePropagator *tracing.AmqpTracePropagator + + // health check + health bool + healthMux *sync.RWMutex + lastPausedAt time.Time + lastPausedAtMux *sync.RWMutex + lastPublishedAt time.Time + lastPublishedAtMux *sync.RWMutex + + closed atomic.Bool } func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublisher, error) { @@ -58,7 +70,7 @@ func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublis chManager: chManager, unpublishedMessages: make(chan message), - wg: &sync.WaitGroup{}, + wg: newWaitGroupCounter(), pausePublish: true, pausePublishMux: &sync.RWMutex{}, @@ -73,6 +85,15 @@ func NewPublisher(amqpConf amqp.Config, log *zerolog.Logger) (events.EventPublis notifyPublishChan: make(chan amqp.Confirmation), tracePropagator: tracing.NewAmqpTracing(log), + + health: true, + healthMux: &sync.RWMutex{}, + lastPausedAt: time.Now(), + lastPausedAtMux: &sync.RWMutex{}, + lastPublishedAt: time.Now(), + lastPublishedAtMux: &sync.RWMutex{}, + + closed: atomic.Bool{}, } publisher.publisherFunc = publisher.publishMessage diff --git a/internal/events/rabbitmq/publisher/startPublisher.go b/internal/events/rabbitmq/publisher/startPublisher.go index fc2e935..0829611 100644 --- a/internal/events/rabbitmq/publisher/startPublisher.go +++ b/internal/events/rabbitmq/publisher/startPublisher.go @@ -8,6 +8,7 @@ import ( func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error { go r.proccessingLoop() + go r.healthCheckLoop() for { err := r.chManager.Channel.Confirm(false) @@ -16,9 +17,7 @@ func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error { } r.listenForNotifications() - r.pausePublishMux.Lock() - r.pausePublish = false - r.pausePublishMux.Unlock() + r.resume() // Wait for reconnection err = <-r.chManager.NotifyReconnection diff --git a/internal/log/correlationIDHook.go b/internal/log/correlationIDHook.go index 4cfb9d5..97c9a8d 100644 --- a/internal/log/correlationIDHook.go +++ b/internal/log/correlationIDHook.go @@ -14,6 +14,6 @@ func (h CorrelationIDHook) Run(e *zerolog.Event, level zerolog.Level, msg string ctx := e.GetCtx() correlationID := thunderContext.CorrelationIDFromContext(ctx) if correlationID != "" { - e.Str("correlation-id", correlationID) + e.Str("correlation_id", correlationID) } } diff --git a/internal/log/tracingHook.go b/internal/log/tracingHook.go index b1c740b..fe88561 100644 --- a/internal/log/tracingHook.go +++ b/internal/log/tracingHook.go @@ -18,5 +18,5 @@ func (h TracingHook) Run(e *zerolog.Event, level zerolog.Level, msg string) { spanID := spanContext.SpanID().String() traceID := spanContext.TraceID().String() - e.Str("trace-id", traceID).Str("span-id", spanID) + e.Str("trace_id", traceID).Str("span_id", spanID) }