diff --git a/cmd/notifier/config.go b/cmd/notifier/config.go index eb737a798..b9bc5a030 100644 --- a/cmd/notifier/config.go +++ b/cmd/notifier/config.go @@ -56,6 +56,8 @@ type notifierConfig struct { MaxFailAttemptToSendAvailable int `yaml:"max_fail_attempt_to_send_available"` // Specify log level by entities SetLogLevel setLogLevelConfig `yaml:"set_log_level"` + // CheckNotifierStateTimeout is the timeout between marking *.alive.count metric based on notifier state. + CheckNotifierStateTimeout string `yaml:"check_notifier_state_timeout"` } type selfStateConfig struct { @@ -116,6 +118,7 @@ func getDefault() config { Timezone: "UTC", ReadBatchSize: int(notifier.NotificationsLimitUnlimited), MaxFailAttemptToSendAvailable: 3, + CheckNotifierStateTimeout: "10s", }, Telemetry: cmd.TelemetryConfig{ Listen: ":8093", @@ -202,6 +205,7 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config { MaxFailAttemptToSendAvailable: config.MaxFailAttemptToSendAvailable, LogContactsToLevel: contacts, LogSubscriptionsToLevel: subscriptions, + CheckNotifierStateTimeout: to.Duration(config.CheckNotifierStateTimeout), } } diff --git a/cmd/notifier/main.go b/cmd/notifier/main.go index 37e1ac7f7..7c9ea395d 100644 --- a/cmd/notifier/main.go +++ b/cmd/notifier/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "os" @@ -99,6 +100,7 @@ func main() { } notifierMetrics := metrics.ConfigureNotifierMetrics(telemetry.Metrics, serviceName) + sender := notifier.NewNotifier( database, logger, @@ -119,7 +121,7 @@ func main() { // Start moira self state checker if config.Notifier.SelfState.getSettings().Enabled { - selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings(), metrics.ConfigureHeartBeatMetrics(telemetry.Metrics)) + selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings()) if err := selfState.Start(); err != nil { logger.Fatal(). Error(err). @@ -156,6 +158,17 @@ func main() { fetchEventsWorker.Start() defer stopFetchEvents(fetchEventsWorker) + aliveWatcher := notifier.NewAliveWatcher( + logger, + database, + notifierConfig.CheckNotifierStateTimeout, + notifierMetrics, + ) + ctx, cancel := context.WithCancel(context.Background()) + + aliveWatcher.Start(ctx) + defer cancel() + logger.Info(). String("moira_version", MoiraVersion). Msg("Moira Notifier Started") diff --git a/metrics/heartbeat.go b/metrics/heartbeat.go deleted file mode 100644 index 378e158cf..000000000 --- a/metrics/heartbeat.go +++ /dev/null @@ -1,22 +0,0 @@ -package metrics - -// HeartBeatMetrics is a collection of metrics used in hearbeats. -type HeartBeatMetrics struct { - notifierIsAlive Meter -} - -// ConfigureHeartBeatMetrics is notifier metrics configurator. -func ConfigureHeartBeatMetrics(registry Registry) *HeartBeatMetrics { - return &HeartBeatMetrics{ - notifierIsAlive: registry.NewMeter("", "alive"), - } -} - -// MarkNotifierIsAlive marks metric value. -func (hb HeartBeatMetrics) MarkNotifierIsAlive(isAlive bool) { - if isAlive { - hb.notifierIsAlive.Mark(1) - } - - hb.notifierIsAlive.Mark(0) -} diff --git a/metrics/notifier.go b/metrics/notifier.go index 8711ef11c..3af294d63 100644 --- a/metrics/notifier.go +++ b/metrics/notifier.go @@ -1,6 +1,8 @@ package metrics -import "time" +import ( + "time" +) // NotifierMetrics is a collection of metrics used in notifier. type NotifierMetrics struct { @@ -16,6 +18,7 @@ type NotifierMetrics struct { PlotsBuildDurationMs Histogram PlotsEvaluateTriggerDurationMs Histogram fetchNotificationsDurationMs Histogram + notifierIsAlive Meter } // ConfigureNotifierMetrics is notifier metrics configurator. @@ -33,6 +36,7 @@ func ConfigureNotifierMetrics(registry Registry, prefix string) *NotifierMetrics PlotsBuildDurationMs: registry.NewHistogram("plots", "build", "duration", "ms"), PlotsEvaluateTriggerDurationMs: registry.NewHistogram("plots", "evaluate", "trigger", "duration", "ms"), fetchNotificationsDurationMs: registry.NewHistogram("fetch", "notifications", "duration", "ms"), + notifierIsAlive: registry.NewMeter("", "alive"), } } @@ -66,3 +70,13 @@ func (metrics *NotifierMetrics) MarkSendersFailedMetrics(contactType string) { func (metrics *NotifierMetrics) MarkSendingFailed() { metrics.SendingFailed.Mark(1) } + +// MarkNotifierIsAlive marks metric value. +func (metrics *NotifierMetrics) MarkNotifierIsAlive(isAlive bool) { + if isAlive { + metrics.notifierIsAlive.Mark(1) + return + } + + metrics.notifierIsAlive.Mark(0) +} diff --git a/notifier/alive_watcher.go b/notifier/alive_watcher.go new file mode 100644 index 000000000..962e017b0 --- /dev/null +++ b/notifier/alive_watcher.go @@ -0,0 +1,66 @@ +package notifier + +import ( + "context" + "time" + + "github.com/moira-alert/moira" + "github.com/moira-alert/moira/metrics" +) + +// AliveWatcher is responsible for checking notifier state and marking notifier.alive metrics. +type AliveWatcher struct { + logger moira.Logger + database moira.Database + checkNotifierStateTimeout time.Duration + notifierMetrics *metrics.NotifierMetrics +} + +// NewAliveWatcher is an initializer for AliveWatcher. +func NewAliveWatcher( + logger moira.Logger, + database moira.Database, + checkNotifierStateTimeout time.Duration, + notifierMetrics *metrics.NotifierMetrics, +) *AliveWatcher { + return &AliveWatcher{ + logger: logger, + database: database, + checkNotifierStateTimeout: checkNotifierStateTimeout, + notifierMetrics: notifierMetrics, + } +} + +// Start starts the checking loop in separate goroutine. +// Use context.WithCancel, context.WithTimeout etc. to terminate check loop. +func (watcher *AliveWatcher) Start(ctx context.Context) { + go watcher.stateChecker(ctx) +} + +func (watcher *AliveWatcher) stateChecker(ctx context.Context) { + watcher.logger.Info(). + Interface("check_timeout_seconds", watcher.checkNotifierStateTimeout.Seconds()). + Msg("Moira Notifier alive watcher started") + + ticker := time.NewTicker(watcher.checkNotifierStateTimeout) + + for { + select { + case <-ctx.Done(): + watcher.logger.Info().Msg("Moira Notifier alive watcher stopped") + return + case <-ticker.C: + watcher.checkNotifierState() + } + } +} + +func (watcher *AliveWatcher) checkNotifierState() { + state, _ := watcher.database.GetNotifierState() + if state != moira.SelfStateOK { + watcher.notifierMetrics.MarkNotifierIsAlive(false) + return + } + + watcher.notifierMetrics.MarkNotifierIsAlive(true) +} diff --git a/notifier/alive_watcher_test.go b/notifier/alive_watcher_test.go new file mode 100644 index 000000000..3ab25818b --- /dev/null +++ b/notifier/alive_watcher_test.go @@ -0,0 +1,112 @@ +package notifier + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/moira-alert/moira" + "github.com/moira-alert/moira/metrics" + . "github.com/smartystreets/goconvey/convey" + "go.uber.org/mock/gomock" + + mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert" + mock_metrics "github.com/moira-alert/moira/mock/moira-alert/metrics" +) + +func initAliveMeter(mockCtrl *gomock.Controller) (*mock_metrics.MockRegistry, *mock_metrics.MockMeter) { + mockRegistry := mock_metrics.NewMockRegistry(mockCtrl) + mockAliveMeter := mock_metrics.NewMockMeter(mockCtrl) + + mockRegistry.EXPECT().NewMeter(gomock.Any()).Times(5) + mockRegistry.EXPECT().NewHistogram(gomock.Any()).Times(3) + mockRegistry.EXPECT().NewMeter("", "alive").Return(mockAliveMeter) + + return mockRegistry, mockAliveMeter +} + +func TestAliveWatcher_checkNotifierState(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) + + mockRegistry, mockAliveMeter := initAliveMeter(mockCtrl) + testNotifierMetrics := metrics.ConfigureNotifierMetrics(mockRegistry, "") + + aliveWatcher := NewAliveWatcher(nil, dataBase, 0, testNotifierMetrics) + + Convey("checkNotifierState", t, func() { + Convey("when OK", func() { + dataBase.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil) + mockAliveMeter.EXPECT().Mark(int64(1)) + + aliveWatcher.checkNotifierState() + }) + + Convey("when not OK state and no errors", func() { + notOKStates := []string{moira.SelfStateERROR, "err", "bad", "", "1"} + + for _, badState := range notOKStates { + dataBase.EXPECT().GetNotifierState().Return(badState, nil) + mockAliveMeter.EXPECT().Mark(int64(0)) + + aliveWatcher.checkNotifierState() + } + }) + + Convey("when not OK state and errors", func() { + notOKState := "" + givenErrors := []error{ + errors.New("one error"), + errors.New("another error"), + } + + for _, err := range givenErrors { + dataBase.EXPECT().GetNotifierState().Return(notOKState, err) + mockAliveMeter.EXPECT().Mark(int64(0)) + + aliveWatcher.checkNotifierState() + } + }) + }) +} + +func TestAliveWatcher_Start(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + logger := mock_moira_alert.NewMockLogger(mockCtrl) + eventsBuilder := mock_moira_alert.NewMockEventBuilder(mockCtrl) + logger.EXPECT().Info().Return(eventsBuilder).AnyTimes() + + dataBase := mock_moira_alert.NewMockDatabase(mockCtrl) + + const ( + testCheckNotifierStateTimeout = time.Second + ) + + mockRegistry, mockAliveMeter := initAliveMeter(mockCtrl) + testNotifierMetrics := metrics.ConfigureNotifierMetrics(mockRegistry, "") + + aliveWatcher := NewAliveWatcher(logger, dataBase, testCheckNotifierStateTimeout, testNotifierMetrics) + + Convey("AliveWatcher stops on cancel", t, func() { + eventsBuilder.EXPECT(). + Interface("check_timeout_seconds", testCheckNotifierStateTimeout.Seconds()). + Return(eventsBuilder) + eventsBuilder.EXPECT().Msg("Moira Notifier alive watcher started") + eventsBuilder.EXPECT().Msg("Moira Notifier alive watcher stopped") + + dataBase.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil).AnyTimes() + mockAliveMeter.EXPECT().Mark(int64(1)).AnyTimes() + + ctx, cancel := context.WithCancel(context.Background()) + aliveWatcher.Start(ctx) + + time.Sleep(time.Second * 3) + cancel() + time.Sleep(time.Millisecond) + }) +} diff --git a/notifier/config.go b/notifier/config.go index 34c6e53d4..8e1abe66b 100644 --- a/notifier/config.go +++ b/notifier/config.go @@ -25,4 +25,5 @@ type Config struct { MaxFailAttemptToSendAvailable int LogContactsToLevel map[string]string LogSubscriptionsToLevel map[string]string + CheckNotifierStateTimeout time.Duration } diff --git a/notifier/selfstate/heartbeat/notifier.go b/notifier/selfstate/heartbeat/notifier.go index 7b6877be4..0b418d0c9 100644 --- a/notifier/selfstate/heartbeat/notifier.go +++ b/notifier/selfstate/heartbeat/notifier.go @@ -3,37 +3,30 @@ package heartbeat import ( "fmt" - "github.com/moira-alert/moira/metrics" - "github.com/moira-alert/moira" ) type notifier struct { - db moira.Database - log moira.Logger - metrics *metrics.HeartBeatMetrics + db moira.Database + log moira.Logger } -func GetNotifier(logger moira.Logger, database moira.Database, metrics *metrics.HeartBeatMetrics) Heartbeater { +func GetNotifier(logger moira.Logger, database moira.Database) Heartbeater { return ¬ifier{ - db: database, - log: logger, - metrics: metrics, + db: database, + log: logger, } } func (check notifier) Check(int64) (int64, bool, error) { state, _ := check.db.GetNotifierState() if state != moira.SelfStateOK { - check.metrics.MarkNotifierIsAlive(false) - check.log.Error(). String("error", check.GetErrorMessage()). Msg("Notifier is not healthy") return 0, true, nil } - check.metrics.MarkNotifierIsAlive(true) check.log.Debug(). String("state", state). diff --git a/notifier/selfstate/heartbeat/notifier_test.go b/notifier/selfstate/heartbeat/notifier_test.go index 3d4976035..ba56a6d7d 100644 --- a/notifier/selfstate/heartbeat/notifier_test.go +++ b/notifier/selfstate/heartbeat/notifier_test.go @@ -4,8 +4,6 @@ import ( "testing" "time" - "github.com/moira-alert/moira/metrics" - "github.com/moira-alert/moira" mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert" @@ -47,7 +45,6 @@ func TestNotifierState(t *testing.T) { func createNotifierStateTest(t *testing.T) *notifier { mockCtrl := gomock.NewController(t) logger, _ := logging.GetLogger("MetricDelay") - metric := metrics.ConfigureHeartBeatMetrics(metrics.NewDummyRegistry()) - return GetNotifier(logger, mock_moira_alert.NewMockDatabase(mockCtrl), metric).(*notifier) + return GetNotifier(logger, mock_moira_alert.NewMockDatabase(mockCtrl)).(*notifier) } diff --git a/notifier/selfstate/selfstate.go b/notifier/selfstate/selfstate.go index 99bba0ed0..27245cebc 100644 --- a/notifier/selfstate/selfstate.go +++ b/notifier/selfstate/selfstate.go @@ -3,8 +3,6 @@ package selfstate import ( "time" - "github.com/moira-alert/moira/metrics" - "github.com/moira-alert/moira/notifier/selfstate/heartbeat" "gopkg.in/tomb.v2" @@ -30,9 +28,15 @@ type SelfCheckWorker struct { } // NewSelfCheckWorker creates SelfCheckWorker. -func NewSelfCheckWorker(logger moira.Logger, database moira.Database, notifier notifier.Notifier, config Config, metrics *metrics.HeartBeatMetrics) *SelfCheckWorker { - heartbeats := createStandardHeartbeats(logger, database, config, metrics) - return &SelfCheckWorker{Logger: logger, Database: database, Notifier: notifier, Config: config, heartbeats: heartbeats} +func NewSelfCheckWorker(logger moira.Logger, database moira.Database, notifier notifier.Notifier, config Config) *SelfCheckWorker { + heartbeats := createStandardHeartbeats(logger, database, config) + return &SelfCheckWorker{ + Logger: logger, + Database: database, + Notifier: notifier, + Config: config, + heartbeats: heartbeats, + } } // Start self check worker. @@ -49,6 +53,7 @@ func (selfCheck *SelfCheckWorker) Start() error { selfCheck.Database.NewLock(selfStateLockName, selfStateLockTTL), selfCheck.selfStateChecker, ).Run(selfCheck.tomb.Dying()) + return nil }) @@ -61,7 +66,7 @@ func (selfCheck *SelfCheckWorker) Stop() error { return selfCheck.tomb.Wait() } -func createStandardHeartbeats(logger moira.Logger, database moira.Database, conf Config, metrics *metrics.HeartBeatMetrics) []heartbeat.Heartbeater { +func createStandardHeartbeats(logger moira.Logger, database moira.Database, conf Config) []heartbeat.Heartbeater { heartbeats := make([]heartbeat.Heartbeater, 0) if hb := heartbeat.GetDatabase(conf.RedisDisconnectDelaySeconds, logger, database); hb != nil { @@ -80,7 +85,7 @@ func createStandardHeartbeats(logger moira.Logger, database moira.Database, conf heartbeats = append(heartbeats, hb) } - if hb := heartbeat.GetNotifier(logger, database, metrics); hb != nil { + if hb := heartbeat.GetNotifier(logger, database); hb != nil { heartbeats = append(heartbeats, hb) } diff --git a/notifier/selfstate/selfstate_test.go b/notifier/selfstate/selfstate_test.go index 71859dca9..aa94d93fe 100644 --- a/notifier/selfstate/selfstate_test.go +++ b/notifier/selfstate/selfstate_test.go @@ -5,8 +5,6 @@ import ( "testing" "time" - "github.com/moira-alert/moira/metrics" - mock_heartbeat "github.com/moira-alert/moira/mock/heartbeat" "github.com/moira-alert/moira/notifier/selfstate/heartbeat" @@ -177,10 +175,8 @@ func configureWorker(t *testing.T, isStart bool) *selfCheckWorkerMock { database.EXPECT().NewLock(gomock.Any(), gomock.Any()).Return(lock) } - metric := &metrics.HeartBeatMetrics{} - return &selfCheckWorkerMock{ - selfCheckWorker: NewSelfCheckWorker(logger, database, notif, conf, metric), + selfCheckWorker: NewSelfCheckWorker(logger, database, notif, conf), database: database, notif: notif, conf: conf,