diff --git a/cmd/cli/main.go b/cmd/cli/main.go index a8130361a..a00510ade 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -469,7 +469,7 @@ func openFile(filePath string, mode int) (*os.File, error) { if filePath == "" { return nil, fmt.Errorf("file is not specified") } - file, err := os.OpenFile(filePath, mode, 0666) //nolint:gofumpt,gomnd + file, err := os.OpenFile(filePath, mode, 0o666) //nolint:gofumpt,gomnd if err != nil { return nil, fmt.Errorf("cannot open file: %w", err) } diff --git a/cmd/config.go b/cmd/config.go index 0d34d7da0..7a5ce2321 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -258,7 +258,7 @@ func (config *GraphiteRemoteConfig) GetRemoteSourceSettings() *graphiteRemoteSou } } -// GraphiteRemoteConfig is remote prometheus settings structure. +// PrometheusRemoteConfig is remote prometheus settings structure. type PrometheusRemoteConfig struct { RemoteCommonConfig `yaml:",inline"` // Timeout for prometheus api requests @@ -277,7 +277,7 @@ func (config PrometheusRemoteConfig) getRemoteCommon() *RemoteCommonConfig { return &config.RemoteCommonConfig } -// GetRemoteSourceSettings returns remote config parsed from moira config files. +// GetPrometheusSourceSettings returns remote config parsed from moira config files. func (config *PrometheusRemoteConfig) GetPrometheusSourceSettings() *prometheusRemoteSource.Config { return &prometheusRemoteSource.Config{ URL: config.URL, diff --git a/cmd/notifier/config.go b/cmd/notifier/config.go index 8a210e686..8277024b9 100644 --- a/cmd/notifier/config.go +++ b/cmd/notifier/config.go @@ -38,6 +38,8 @@ type notifierConfig struct { SenderTimeout string `yaml:"sender_timeout"` // Hard timeout to stop retrying to send notification after multiple failed attempts ResendingTimeout string `yaml:"resending_timeout"` + // Delay before performing one more send attempt + ReschedulingDelay string `yaml:"rescheduling_delay"` // Senders configuration section. See https://moira.readthedocs.io/en/latest/installation/configuration.html for more explanation Senders []map[string]interface{} `yaml:"senders"` // Self state monitor configuration section. Note: No inner subscriptions is required. It's own notification mechanism will be used. @@ -94,15 +96,16 @@ func getDefault() config { NotificationHistoryQueryLimit: int(notifier.NotificationsLimitUnlimited), }, Notification: cmd.NotificationConfig{ - DelayedTime: "1m", + DelayedTime: "50s", TransactionTimeout: "100ms", TransactionMaxRetries: 10, TransactionHeuristicLimit: 10000, ResaveTime: "30s", }, Notifier: notifierConfig{ - SenderTimeout: "10s", - ResendingTimeout: "1:00", + SenderTimeout: "10s", + ResendingTimeout: "1:00", + ReschedulingDelay: "60s", SelfState: selfStateConfig{ Enabled: false, RedisDisconnectDelay: "30s", @@ -191,6 +194,7 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config { SelfStateContacts: config.SelfState.Contacts, SendingTimeout: to.Duration(config.SenderTimeout), ResendingTimeout: to.Duration(config.ResendingTimeout), + ReschedulingDelay: to.Duration(config.ReschedulingDelay), Senders: config.Senders, FrontURL: config.FrontURI, Location: location, diff --git a/cmd/notifier/main.go b/cmd/notifier/main.go index 4309dfa29..37e1ac7f7 100644 --- a/cmd/notifier/main.go +++ b/cmd/notifier/main.go @@ -7,6 +7,8 @@ import ( "os/signal" "syscall" + "github.com/moira-alert/moira/clock" + "github.com/moira-alert/moira" "github.com/moira-alert/moira/cmd" "github.com/moira-alert/moira/database/redis" @@ -91,6 +93,11 @@ func main() { notifierConfig := config.Notifier.getSettings(logger) + systemClock := clock.NewSystemClock() + schedulerConfig := notifier.SchedulerConfig{ + ReschedulingDelay: notifierConfig.ReschedulingDelay, + } + notifierMetrics := metrics.ConfigureNotifierMetrics(telemetry.Metrics, serviceName) sender := notifier.NewNotifier( database, @@ -99,6 +106,8 @@ func main() { notifierMetrics, metricSourceProvider, imageStoreMap, + systemClock, + schedulerConfig, ) // Register moira senders @@ -133,11 +142,16 @@ func main() { // Start moira new events fetcher fetchEventsWorker := &events.FetchEventsWorker{ - Logger: logger, - Database: database, - Scheduler: notifier.NewScheduler(database, logger, notifierMetrics), - Metrics: notifierMetrics, - Config: notifierConfig, + Logger: logger, + Database: database, + Scheduler: notifier.NewScheduler( + database, + logger, + notifierMetrics, + schedulerConfig, + systemClock), + Metrics: notifierMetrics, + Config: notifierConfig, } fetchEventsWorker.Start() defer stopFetchEvents(fetchEventsWorker) diff --git a/datatypes.go b/datatypes.go index 1e82631f8..7756697a6 100644 --- a/datatypes.go +++ b/datatypes.go @@ -304,7 +304,7 @@ func (notification *ScheduledNotification) Less(other Comparable) (bool, error) return notification.Timestamp < otherNotification.Timestamp, nil } -// IsDelayed checks if the notification is delayed, the difference between the send time and the create time +// IsDelayed checks if the notification is delayed, the difference between the send time and the creation time // is greater than the delayedTime. func (notification *ScheduledNotification) IsDelayed(delayedTime int64) bool { return notification.CreatedAt != 0 && notification.Timestamp-notification.CreatedAt > delayedTime @@ -905,3 +905,14 @@ func SetMaintenanceUserAndTime(maintenanceCheck MaintenanceCheck, maintenance in } maintenanceCheck.SetMaintenance(&maintenanceInfo, maintenance) } + +// SchedulerParams is the parameters for notifier.Scheduler essential for scheduling notification. +type SchedulerParams struct { + Event NotificationEvent + Trigger TriggerData + Contact ContactData + Plotting PlottingData + ThrottledOld bool + // SendFail is amount of failed send attempts + SendFail int +} diff --git a/integration_tests/notifier/notifier_test.go b/integration_tests/notifier/notifier_test.go index 52f1ef477..6d9812e75 100644 --- a/integration_tests/notifier/notifier_test.go +++ b/integration_tests/notifier/notifier_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/moira-alert/moira/clock" + "github.com/golang/mock/gomock" metricSource "github.com/moira-alert/moira/metric_source" "github.com/moira-alert/moira/metric_source/local" @@ -30,11 +32,12 @@ var ( ) var notifierConfig = notifier.Config{ - SendingTimeout: time.Millisecond * 10, - ResendingTimeout: time.Hour * 24, - Location: location, - DateTimeFormat: dateTimeFormat, - ReadBatchSize: notifier.NotificationsLimitUnlimited, + SendingTimeout: time.Millisecond * 10, + ResendingTimeout: time.Hour * 24, + ReschedulingDelay: time.Minute, + Location: location, + DateTimeFormat: dateTimeFormat, + ReadBatchSize: notifier.NotificationsLimitUnlimited, } var shutdown = make(chan struct{}) @@ -119,6 +122,9 @@ func TestNotifier(t *testing.T) { metricsSourceProvider := metricSource.CreateTestMetricSourceProvider(local.Create(database), nil, nil) + systemClock := clock.NewSystemClock() + schedulerConfig := notifier.SchedulerConfig{ReschedulingDelay: notifierConfig.ReschedulingDelay} + notifierInstance := notifier.NewNotifier( database, logger, @@ -126,6 +132,8 @@ func TestNotifier(t *testing.T) { notifierMetrics, metricsSourceProvider, map[string]moira.ImageStore{}, + systemClock, + schedulerConfig, ) sender := mock_moira_alert.NewMockSender(mockCtrl) @@ -141,10 +149,17 @@ func TestNotifier(t *testing.T) { notifierInstance.RegisterSender(senderSettings, sender) //nolint fetchEventsWorker := events.FetchEventsWorker{ - Database: database, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(database, logger, notifierMetrics), + Database: database, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + database, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: notifierConfig.ReschedulingDelay, + }, + systemClock), } fetchNotificationsWorker := notifications.FetchNotificationsWorker{ diff --git a/local/notifier.yml b/local/notifier.yml index 63aa9cf9d..1fd44427a 100644 --- a/local/notifier.yml +++ b/local/notifier.yml @@ -29,6 +29,7 @@ prometheus_remote: notifier: sender_timeout: 10s resending_timeout: "1:00" + rescheduling_delay: 60s senders: [] moira_selfstate: enabled: false @@ -45,7 +46,7 @@ notification_history: ttl: 48h query_limit: 10000 notification: - delayed_time: 1m + delayed_time: 50s transaction_timeout: 100ms transaction_max_retries: 10 transaction_heuristic_limit: 10000 diff --git a/logging/zerolog_adapter/logger.go b/logging/zerolog_adapter/logger.go index c47d89f01..fc3b446fd 100644 --- a/logging/zerolog_adapter/logger.go +++ b/logging/zerolog_adapter/logger.go @@ -62,10 +62,10 @@ func getLogWriter(logFileName string) (io.Writer, error) { } logDir := filepath.Dir(logFileName) - if err := os.MkdirAll(logDir, 0755); err != nil { //nolint:gofumpt,gomnd + if err := os.MkdirAll(logDir, 0o755); err != nil { //nolint:gofumpt,gomnd return nil, fmt.Errorf("can't create log directories %s: %s", logDir, err.Error()) } - logFile, err := os.OpenFile(logFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) //nolint:gofumpt,gomnd + logFile, err := os.OpenFile(logFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) //nolint:gofumpt,gomnd if err != nil { return nil, fmt.Errorf("can't open log file %s: %s", logFileName, err.Error()) } diff --git a/mock/scheduler/scheduler.go b/mock/scheduler/scheduler.go index 2f45c0d4c..d8a0a5b71 100644 --- a/mock/scheduler/scheduler.go +++ b/mock/scheduler/scheduler.go @@ -6,7 +6,6 @@ package mock_scheduler import ( reflect "reflect" - time "time" gomock "github.com/golang/mock/gomock" moira "github.com/moira-alert/moira" @@ -36,15 +35,15 @@ func (m *MockScheduler) EXPECT() *MockSchedulerMockRecorder { } // ScheduleNotification mocks base method. -func (m *MockScheduler) ScheduleNotification(arg0 time.Time, arg1 moira.NotificationEvent, arg2 moira.TriggerData, arg3 moira.ContactData, arg4 moira.PlottingData, arg5 bool, arg6 int, arg7 moira.Logger) *moira.ScheduledNotification { +func (m *MockScheduler) ScheduleNotification(arg0 moira.SchedulerParams, arg1 moira.Logger) *moira.ScheduledNotification { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ScheduleNotification", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) + ret := m.ctrl.Call(m, "ScheduleNotification", arg0, arg1) ret0, _ := ret[0].(*moira.ScheduledNotification) return ret0 } // ScheduleNotification indicates an expected call of ScheduleNotification. -func (mr *MockSchedulerMockRecorder) ScheduleNotification(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7 interface{}) *gomock.Call { +func (mr *MockSchedulerMockRecorder) ScheduleNotification(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleNotification", reflect.TypeOf((*MockScheduler)(nil).ScheduleNotification), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleNotification", reflect.TypeOf((*MockScheduler)(nil).ScheduleNotification), arg0, arg1) } diff --git a/notifier/config.go b/notifier/config.go index 2d01b738f..a89b4b7ec 100644 --- a/notifier/config.go +++ b/notifier/config.go @@ -13,6 +13,7 @@ type Config struct { SelfStateContacts []map[string]string SendingTimeout time.Duration ResendingTimeout time.Duration + ReschedulingDelay time.Duration Senders []map[string]interface{} LogFile string LogLevel string diff --git a/notifier/events/event.go b/notifier/events/event.go index 62d72571f..c6662200e 100644 --- a/notifier/events/event.go +++ b/notifier/events/event.go @@ -149,8 +149,15 @@ func (worker *FetchEventsWorker) processEvent(event moira.NotificationEvent) err continue } event.SubscriptionID = &subscription.ID - notification := worker.Scheduler.ScheduleNotification(time.Now(), event, triggerData, - contact, subscription.Plotting, false, 0, contactLogger) + params := moira.SchedulerParams{ + Event: event, + Trigger: triggerData, + Contact: contact, + Plotting: subscription.Plotting, + ThrottledOld: false, + SendFail: 0, + } + notification := worker.Scheduler.ScheduleNotification(params, contactLogger) key := notification.GetKey() if _, exist := duplications[key]; !exist { if err := worker.Database.AddNotification(notification); err != nil { diff --git a/notifier/events/event_test.go b/notifier/events/event_test.go index e1a8c3620..d4727ea5f 100644 --- a/notifier/events/event_test.go +++ b/notifier/events/event_test.go @@ -12,6 +12,7 @@ import ( "github.com/moira-alert/moira" "github.com/moira-alert/moira/database" "github.com/moira-alert/moira/metrics" + mock_clock "github.com/moira-alert/moira/mock/clock" mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert" mock_scheduler "github.com/moira-alert/moira/mock/scheduler" "github.com/moira-alert/moira/notifier" @@ -25,14 +26,23 @@ func TestEvent(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) scheduler := mock_scheduler.NewMockScheduler(mockCtrl) logger, _ := logging.GetLogger("Events") + systemClock := mock_clock.NewMockClock(mockCtrl) + systemClock.EXPECT().Now().Return(time.Now()).AnyTimes() Convey("When event is TEST and subscription is disabled, should add new notification", t, func() { worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ State: moira.StateTEST, @@ -49,8 +59,8 @@ func TestEvent(t *testing.T) { SubscriptionID: event.SubscriptionID, }, SendFail: 0, - Timestamp: time.Now().Unix(), - CreatedAt: time.Now().Unix(), + Timestamp: systemClock.Now().Unix(), + CreatedAt: systemClock.Now().Unix(), Throttled: false, Contact: contact, } @@ -78,7 +88,7 @@ func TestEvent(t *testing.T) { } dataBase.EXPECT().GetContact(event.ContactID).Times(1).Return(contact, nil) dataBase.EXPECT().GetContact(contact.ID).Times(1).Return(contact, nil) - now := time.Now() + now := systemClock.Now() notification := moira.ScheduledNotification{ Event: moira.NotificationEvent{ TriggerID: "", @@ -95,7 +105,17 @@ func TestEvent(t *testing.T) { } event2 := event event2.SubscriptionID = &subID - scheduler.EXPECT().ScheduleNotification(gomock.Any(), event2, moira.TriggerData{}, contact, notification.Plotting, false, 0, gomock.Any()).Return(¬ification) + + params := moira.SchedulerParams{ + Event: event2, + Trigger: moira.TriggerData{}, + Contact: contact, + Plotting: notification.Plotting, + ThrottledOld: false, + SendFail: 0, + } + + scheduler.EXPECT().ScheduleNotification(params, gomock.Any()).Return(¬ification) dataBase.EXPECT().AddNotification(¬ification) err := worker.processEvent(event) @@ -109,13 +129,21 @@ func TestNoSubscription(t *testing.T) { defer mockCtrl.Finish() dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) logger, _ := logging.GetLogger("Events") + systemClock := mock_clock.NewMockClock(mockCtrl) worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ @@ -140,13 +168,21 @@ func TestDisabledNotification(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) logger := mock_moira_alert.NewMockLogger(mockCtrl) eventBuilder := mock_moira_alert.NewMockEventBuilder(mockCtrl) + systemClock := mock_clock.NewMockClock(mockCtrl) worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ @@ -185,6 +221,7 @@ func TestSubscriptionsManagedToIgnoreEvents(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) logger := mock_moira_alert.NewMockLogger(mockCtrl) eventBuilder := mock_moira_alert.NewMockEventBuilder(mockCtrl) + systemClock := mock_clock.NewMockClock(mockCtrl) logger.EXPECT().Clone().Return(logger).AnyTimes() logger.EXPECT().String(gomock.Any(), gomock.Any()).Return(logger).AnyTimes() @@ -192,11 +229,17 @@ func TestSubscriptionsManagedToIgnoreEvents(t *testing.T) { Convey("[TRUE] Do not send WARN notifications", t, func() { worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ @@ -228,11 +271,18 @@ func TestSubscriptionsManagedToIgnoreEvents(t *testing.T) { }) Convey("[TRUE] Send notifications when triggers degraded only", t, func() { worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ @@ -264,11 +314,18 @@ func TestSubscriptionsManagedToIgnoreEvents(t *testing.T) { }) Convey("[TRUE] Do not send WARN notifications & [TRUE] Send notifications when triggers degraded only", t, func() { worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ @@ -315,6 +372,7 @@ func TestAddNotification(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) logger, _ := logging.GetLogger("Events") scheduler := mock_scheduler.NewMockScheduler(mockCtrl) + worker := FetchEventsWorker{ Database: dataBase, Logger: logger, @@ -331,11 +389,19 @@ func TestAddNotification(t *testing.T) { SubscriptionID: &subscription.ID, } emptyNotification := moira.ScheduledNotification{} + params := moira.SchedulerParams{ + Event: event, + Trigger: triggerData, + Contact: contact, + Plotting: emptyNotification.Plotting, + ThrottledOld: false, + SendFail: 0, + } dataBase.EXPECT().GetTrigger(event.TriggerID).Return(trigger, nil) dataBase.EXPECT().GetTagsSubscriptions(triggerData.Tags).Times(1).Return([]*moira.SubscriptionData{&subscription}, nil) dataBase.EXPECT().GetContact(contact.ID).Times(1).Return(contact, nil) - scheduler.EXPECT().ScheduleNotification(gomock.Any(), event, triggerData, contact, emptyNotification.Plotting, false, 0, gomock.Any()).Times(1).Return(&emptyNotification) + scheduler.EXPECT().ScheduleNotification(params, gomock.Any()).Times(1).Return(&emptyNotification) dataBase.EXPECT().AddNotification(&emptyNotification).Times(1).Return(nil) err := worker.processEvent(event) @@ -350,6 +416,7 @@ func TestAddOneNotificationByTwoSubscriptionsWithSame(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) logger, _ := logging.GetLogger("Events") scheduler := mock_scheduler.NewMockScheduler(mockCtrl) + worker := FetchEventsWorker{ Database: dataBase, Logger: logger, @@ -370,12 +437,23 @@ func TestAddOneNotificationByTwoSubscriptionsWithSame(t *testing.T) { notification2 := moira.ScheduledNotification{} + params := moira.SchedulerParams{ + Event: event, + Trigger: triggerData, + Contact: contact, + Plotting: notification2.Plotting, + ThrottledOld: false, + SendFail: 0, + } + params2 := params + params2.Event = event2 + dataBase.EXPECT().GetTrigger(event.TriggerID).Return(trigger, nil) dataBase.EXPECT().GetTagsSubscriptions(triggerData.Tags).Times(1).Return([]*moira.SubscriptionData{&subscription, &subscription4}, nil) dataBase.EXPECT().GetContact(contact.ID).Times(2).Return(contact, nil) - scheduler.EXPECT().ScheduleNotification(gomock.Any(), event, triggerData, contact, notification2.Plotting, false, 0, gomock.Any()).Times(1).Return(¬ification2) - scheduler.EXPECT().ScheduleNotification(gomock.Any(), event2, triggerData, contact, notification2.Plotting, false, 0, gomock.Any()).Times(1).Return(¬ification2) + scheduler.EXPECT().ScheduleNotification(params, gomock.Any()).Times(1).Return(¬ification2) + scheduler.EXPECT().ScheduleNotification(params2, gomock.Any()).Times(1).Return(¬ification2) dataBase.EXPECT().AddNotification(¬ification2).Times(1).Return(nil) @@ -391,13 +469,21 @@ func TestFailReadContact(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) logger := mock_moira_alert.NewMockLogger(mockCtrl) eventBuilder := mock_moira_alert.NewMockEventBuilder(mockCtrl) + systemClock := mock_clock.NewMockClock(mockCtrl) worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ @@ -439,6 +525,7 @@ func TestEmptySubscriptions(t *testing.T) { mockCtrl := gomock.NewController(t) logger := mock_moira_alert.NewMockLogger(mockCtrl) eventBuilder := mock_moira_alert.NewMockEventBuilder(mockCtrl) + systemClock := mock_clock.NewMockClock(mockCtrl) logger.EXPECT().Clone().Return(logger).AnyTimes() logger.EXPECT().String(gomock.Any(), gomock.Any()).Return(logger).AnyTimes() @@ -449,11 +536,18 @@ func TestEmptySubscriptions(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ @@ -484,11 +578,18 @@ func TestEmptySubscriptions(t *testing.T) { mockCtrl := gomock.NewController(t) dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } event := moira.NotificationEvent{ @@ -521,13 +622,21 @@ func TestGetNotificationSubscriptions(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) + systemClock := mock_clock.NewMockClock(mockCtrl) logger, _ := logging.GetLogger("Events") worker := FetchEventsWorker{ - Database: dataBase, - Logger: logger, - Metrics: notifierMetrics, - Scheduler: notifier.NewScheduler(dataBase, logger, notifierMetrics), - Config: emptyNotifierConfig, + Database: dataBase, + Logger: logger, + Metrics: notifierMetrics, + Scheduler: notifier.NewScheduler( + dataBase, + logger, + notifierMetrics, + notifier.SchedulerConfig{ + ReschedulingDelay: emptyNotifierConfig.ReschedulingDelay, + }, + systemClock), + Config: emptyNotifierConfig, } Convey("Error GetSubscription", t, func() { @@ -579,6 +688,14 @@ func TestGoRoutine(t *testing.T) { SubscriptionID: &subscription.ID, } emptyNotification := moira.ScheduledNotification{} + params := moira.SchedulerParams{ + Event: event, + Trigger: triggerData, + Contact: contact, + Plotting: emptyNotification.Plotting, + ThrottledOld: false, + SendFail: 0, + } shutdown := make(chan struct{}) dataBase.EXPECT().FetchNotificationEvent().Return(moira.NotificationEvent{}, fmt.Errorf("3433434")).Do(func() { @@ -589,7 +706,7 @@ func TestGoRoutine(t *testing.T) { dataBase.EXPECT().GetTrigger(event.TriggerID).Times(1).Return(trigger, nil) dataBase.EXPECT().GetTagsSubscriptions(triggerData.Tags).Times(1).Return([]*moira.SubscriptionData{&subscription}, nil) dataBase.EXPECT().GetContact(contact.ID).Times(1).Return(contact, nil) - scheduler.EXPECT().ScheduleNotification(gomock.Any(), event, triggerData, contact, emptyNotification.Plotting, false, 0, gomock.Any()).Times(1).Return(&emptyNotification) + scheduler.EXPECT().ScheduleNotification(params, gomock.Any()).Times(1).Return(&emptyNotification) dataBase.EXPECT().AddNotification(&emptyNotification).Times(1).Return(nil).Do(func(f ...interface{}) { close(shutdown) }) worker.Start() diff --git a/notifier/notifier.go b/notifier/notifier.go index 99b1e40ca..6a31134a0 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -85,12 +85,15 @@ type StandardNotifier struct { } // NewNotifier is initializer for StandardNotifier. -func NewNotifier(database moira.Database, logger moira.Logger, config Config, metrics *metrics.NotifierMetrics, metricSourceProvider *metricSource.SourceProvider, imageStoreMap map[string]moira.ImageStore) *StandardNotifier { +func NewNotifier(database moira.Database, logger moira.Logger, config Config, metrics *metrics.NotifierMetrics, + metricSourceProvider *metricSource.SourceProvider, imageStoreMap map[string]moira.ImageStore, clock moira.Clock, + schedulerConfig SchedulerConfig, +) *StandardNotifier { return &StandardNotifier{ senders: make(map[string]chan NotificationPackage), logger: logger, database: database, - scheduler: NewScheduler(database, logger, metrics), + scheduler: NewScheduler(database, logger, metrics, schedulerConfig, clock), config: config, metrics: metrics, metricSourceProvider: metricSourceProvider, @@ -158,14 +161,21 @@ func (notifier *StandardNotifier) reschedule(pkg *NotificationPackage, reason st logger.Warning(). Int("number_of_retries", pkg.FailCount). String("reason", reason). - Msg("Can't send message. Retry again in 1 min") + Msg(fmt.Sprintf("Can't send message. Retry again in %s", notifier.config.ReschedulingDelay)) for _, event := range pkg.Events { subID := moira.UseString(event.SubscriptionID) eventLogger := logger.Clone().String(moira.LogFieldNameSubscriptionID, subID) SetLogLevelByConfig(notifier.config.LogSubscriptionsToLevel, subID, &eventLogger) - notification := notifier.scheduler.ScheduleNotification(time.Now(), event, - pkg.Trigger, pkg.Contact, pkg.Plotting, pkg.Throttled, pkg.FailCount+1, eventLogger) + params := moira.SchedulerParams{ + Event: event, + Trigger: pkg.Trigger, + Contact: pkg.Contact, + Plotting: pkg.Plotting, + ThrottledOld: pkg.Throttled, + SendFail: pkg.FailCount + 1, + } + notification := notifier.scheduler.ScheduleNotification(params, eventLogger) if err := notifier.database.AddNotification(notification); err != nil { eventLogger.Error(). Error(err). @@ -239,5 +249,5 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific } func (notifier *StandardNotifier) needToStop(failCount int) bool { - return time.Duration(failCount)*time.Minute > notifier.config.ResendingTimeout + return time.Duration(failCount)*notifier.config.ReschedulingDelay > notifier.config.ResendingTimeout } diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 5813766b9..79d7b43e7 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -14,6 +14,7 @@ import ( "github.com/moira-alert/moira" "github.com/moira-alert/moira/metrics" + mock_clock "github.com/moira-alert/moira/mock/clock" mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert" mock_scheduler "github.com/moira-alert/moira/mock/scheduler" ) @@ -34,10 +35,11 @@ var ( shutdown = make(chan struct{}) location, _ = time.LoadLocation("UTC") defaultConfig = Config{ - SendingTimeout: 10 * time.Millisecond, - ResendingTimeout: time.Hour * 24, - Location: location, - DateTimeFormat: dateTimeFormat, + SendingTimeout: 10 * time.Millisecond, + ResendingTimeout: time.Hour * 24, + ReschedulingDelay: time.Minute, + Location: location, + DateTimeFormat: dateTimeFormat, Senders: []map[string]interface{}{ { "sender_type": "test_sender_type", @@ -103,8 +105,17 @@ func TestUnknownContactType(t *testing.T) { Type: "unknown contact", }, } + params := moira.SchedulerParams{ + Event: event, + Trigger: pkg.Trigger, + Contact: pkg.Contact, + Plotting: pkg.Plotting, + ThrottledOld: pkg.Throttled, + SendFail: pkg.FailCount + 1, + } notification := moira.ScheduledNotification{} - scheduler.EXPECT().ScheduleNotification(gomock.Any(), event, pkg.Trigger, pkg.Contact, pkg.Plotting, pkg.Throttled, pkg.FailCount+1, gomock.Any()).Return(¬ification) + + scheduler.EXPECT().ScheduleNotification(params, gomock.Any()).Return(¬ification) dataBase.EXPECT().AddNotification(¬ification).Return(nil) var wg sync.WaitGroup @@ -124,9 +135,18 @@ func TestFailSendEvent(t *testing.T) { Type: "test_contact_type", }, } + params := moira.SchedulerParams{ + Event: event, + Trigger: pkg.Trigger, + Contact: pkg.Contact, + Plotting: pkg.Plotting, + ThrottledOld: pkg.Throttled, + SendFail: pkg.FailCount + 1, + } notification := moira.ScheduledNotification{} + sender.EXPECT().SendEvents(eventsData, pkg.Contact, pkg.Trigger, plots, pkg.Throttled).Return(fmt.Errorf("Cant't send")) - scheduler.EXPECT().ScheduleNotification(gomock.Any(), event, pkg.Trigger, pkg.Contact, pkg.Plotting, pkg.Throttled, pkg.FailCount+1, gomock.Any()).Return(¬ification) + scheduler.EXPECT().ScheduleNotification(params, gomock.Any()).Return(¬ification) dataBase.EXPECT().AddNotification(¬ification).Return(nil) var wg sync.WaitGroup @@ -194,8 +214,16 @@ func TestTimeout(t *testing.T) { Value: "fail contact", }, } + params := moira.SchedulerParams{ + Event: event, + Trigger: pkg2.Trigger, + Contact: pkg2.Contact, + Plotting: pkg2.Plotting, + ThrottledOld: pkg2.Throttled, + SendFail: pkg2.FailCount + 1, + } - scheduler.EXPECT().ScheduleNotification(gomock.Any(), event, pkg2.Trigger, pkg2.Contact, pkg.Plotting, pkg2.Throttled, pkg2.FailCount+1, gomock.Any()).Return(¬ification) + scheduler.EXPECT().ScheduleNotification(params, gomock.Any()).Return(¬ification) dataBase.EXPECT().AddNotification(¬ification).Return(nil).Do(func(f ...interface{}) { close(shutdown) }) standardNotifier.Send(&pkg2, &wg) @@ -222,8 +250,20 @@ func configureNotifier(t *testing.T, config Config) { scheduler = mock_scheduler.NewMockScheduler(mockCtrl) sender = mock_moira_alert.NewMockSender(mockCtrl) metricsSourceProvider := metricSource.CreateTestMetricSourceProvider(local.Create(dataBase), nil, nil) - - standardNotifier = NewNotifier(dataBase, logger, config, notifierMetrics, metricsSourceProvider, map[string]moira.ImageStore{}) + systemClock := mock_clock.NewMockClock(mockCtrl) + + schedulerConfig := SchedulerConfig{ReschedulingDelay: config.ReschedulingDelay} + + standardNotifier = NewNotifier( + dataBase, + logger, + config, + notifierMetrics, + metricsSourceProvider, + map[string]moira.ImageStore{}, + systemClock, + schedulerConfig, + ) standardNotifier.scheduler = scheduler senderSettings := map[string]interface{}{ "sender_type": "test_type", diff --git a/notifier/scheduler.go b/notifier/scheduler.go index fdbc6eae3..0868c7f8a 100644 --- a/notifier/scheduler.go +++ b/notifier/scheduler.go @@ -10,14 +10,20 @@ import ( // Scheduler implements event scheduling functionality. type Scheduler interface { - ScheduleNotification(now time.Time, event moira.NotificationEvent, trigger moira.TriggerData, - contact moira.ContactData, plotting moira.PlottingData, throttledOld bool, sendFail int, logger moira.Logger) *moira.ScheduledNotification + ScheduleNotification(params moira.SchedulerParams, logger moira.Logger) *moira.ScheduledNotification +} + +// SchedulerConfig is a list of immutable params for Scheduler. +type SchedulerConfig struct { + ReschedulingDelay time.Duration } // StandardScheduler represents standard event scheduling. type StandardScheduler struct { database moira.Database metrics *metrics.NotifierMetrics + config SchedulerConfig + clock moira.Clock } type throttlingLevel struct { @@ -27,41 +33,44 @@ type throttlingLevel struct { } // NewScheduler is initializer for StandardScheduler. -func NewScheduler(database moira.Database, logger moira.Logger, metrics *metrics.NotifierMetrics) *StandardScheduler { +func NewScheduler(database moira.Database, logger moira.Logger, metrics *metrics.NotifierMetrics, config SchedulerConfig, clock moira.Clock, +) *StandardScheduler { return &StandardScheduler{ database: database, metrics: metrics, + config: config, + clock: clock, } } // ScheduleNotification is realization of scheduling event, based on trigger and subscription time intervals and triggers settings. -func (scheduler *StandardScheduler) ScheduleNotification(now time.Time, event moira.NotificationEvent, trigger moira.TriggerData, - contact moira.ContactData, plotting moira.PlottingData, throttledOld bool, sendFail int, logger moira.Logger, +func (scheduler *StandardScheduler) ScheduleNotification(params moira.SchedulerParams, logger moira.Logger, ) *moira.ScheduledNotification { var ( next time.Time throttled bool ) - if sendFail > 0 { - next = now.Add(time.Minute) - throttled = throttledOld + now := scheduler.clock.Now() + if params.SendFail > 0 { + next = now.Add(scheduler.config.ReschedulingDelay) + throttled = params.ThrottledOld } else { - if event.State == moira.StateTEST { + if params.Event.State == moira.StateTEST { next = now throttled = false } else { - next, throttled = scheduler.calculateNextDelivery(now, &event, logger) + next, throttled = scheduler.calculateNextDelivery(now, ¶ms.Event, logger) } } notification := &moira.ScheduledNotification{ - Event: event, - Trigger: trigger, - Contact: contact, + Event: params.Event, + Trigger: params.Trigger, + Contact: params.Contact, Throttled: throttled, - SendFail: sendFail, + SendFail: params.SendFail, Timestamp: next.Unix(), CreatedAt: now.Unix(), - Plotting: plotting, + Plotting: params.Plotting, } logger.Debug(). diff --git a/notifier/scheduler_test.go b/notifier/scheduler_test.go index b9e800d58..88446769a 100644 --- a/notifier/scheduler_test.go +++ b/notifier/scheduler_test.go @@ -9,6 +9,7 @@ import ( "github.com/moira-alert/moira" logging "github.com/moira-alert/moira/logging/zerolog_adapter" "github.com/moira-alert/moira/metrics" + mock_clock "github.com/moira-alert/moira/mock/clock" mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert" . "github.com/smartystreets/goconvey/convey" ) @@ -49,9 +50,19 @@ func TestThrottling(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) logger, _ := logging.GetLogger("Scheduler") metrics2 := metrics.ConfigureNotifierMetrics(metrics.NewDummyRegistry(), "notifier") - scheduler := NewScheduler(dataBase, logger, metrics2) now := time.Now() + systemClock := mock_clock.NewMockClock(mockCtrl) + scheduler := NewScheduler(dataBase, logger, metrics2, SchedulerConfig{ReschedulingDelay: time.Minute}, systemClock) + + params := moira.SchedulerParams{ + Event: event, + Trigger: trigger, + Contact: contact, + Plotting: plottingData, + ThrottledOld: false, + SendFail: 0, + } expected := moira.ScheduledNotification{ Event: event, @@ -65,21 +76,31 @@ func TestThrottling(t *testing.T) { } Convey("Test sendFail more than 0, and no throttling, should send message in one minute", t, func() { + params2 := params + params2.ThrottledOld = false + params2.SendFail = 1 + expected2 := expected expected2.SendFail = 1 expected2.Timestamp = now.Add(time.Minute).Unix() + systemClock.EXPECT().Now().Return(now).Times(1) - notification := scheduler.ScheduleNotification(now, event, trigger, contact, plottingData, false, 1, logger) + notification := scheduler.ScheduleNotification(params2, logger) So(notification, ShouldResemble, &expected2) }) Convey("Test sendFail more than 0, and has throttling, should send message in one minute", t, func() { + params2 := params + params2.ThrottledOld = true + params2.SendFail = 3 + expected2 := expected expected2.SendFail = 3 expected2.Timestamp = now.Add(time.Minute).Unix() expected2.Throttled = true + systemClock.EXPECT().Now().Return(now).Times(1) - notification := scheduler.ScheduleNotification(now, event, trigger, contact, plottingData, true, 3, logger) + notification := scheduler.ScheduleNotification(params2, logger) So(notification, ShouldResemble, &expected2) }) @@ -93,10 +114,14 @@ func TestThrottling(t *testing.T) { SubscriptionID: &subID, } + params2 := params + params2.Event = testEvent + expected3 := expected expected3.Event = testEvent + systemClock.EXPECT().Now().Return(now).Times(1) - notification := scheduler.ScheduleNotification(now, testEvent, trigger, contact, plottingData, false, 0, logger) + notification := scheduler.ScheduleNotification(params2, logger) So(notification, ShouldResemble, &expected3) }) @@ -104,7 +129,10 @@ func TestThrottling(t *testing.T) { dataBase.EXPECT().GetTriggerThrottling(trigger.ID).Times(1).Return(time.Unix(0, 0), time.Unix(0, 0)) dataBase.EXPECT().GetSubscription(*event.SubscriptionID).Times(1).Return(moira.SubscriptionData{}, fmt.Errorf("Error while read subscription")) - notification := scheduler.ScheduleNotification(now, event, trigger, contact, plottingData, false, 0, logger) + params2 := params + systemClock.EXPECT().Now().Return(now).Times(1) + + notification := scheduler.ScheduleNotification(params2, logger) So(notification, ShouldResemble, &expected) }) } @@ -132,7 +160,8 @@ func TestSubscriptionSchedule(t *testing.T) { dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) logger, _ := logging.GetLogger("Scheduler") notifierMetrics := metrics.ConfigureNotifierMetrics(metrics.NewDummyRegistry(), "notifier") - scheduler := NewScheduler(dataBase, logger, notifierMetrics) + systemClock := mock_clock.NewMockClock(mockCtrl) + scheduler := NewScheduler(dataBase, logger, notifierMetrics, SchedulerConfig{ReschedulingDelay: time.Minute}, systemClock) Convey("Throttling disabled", t, func() { now := time.Unix(1441187115, 0)