Skip to content

Commit

Permalink
feat(filter,cli): add remove future metrics in cli and discarding met…
Browse files Browse the repository at this point in the history
…rics written into the future (#1012)
  • Loading branch information
almostinf authored May 7, 2024
1 parent 58f8d7a commit ce13490
Show file tree
Hide file tree
Showing 12 changed files with 629 additions and 110 deletions.
14 changes: 8 additions & 6 deletions cmd/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ type config struct {
}

type cleanupConfig struct {
Whitelist []string `yaml:"whitelist"`
Delete bool `yaml:"delete"`
AddAnonymousToWhitelist bool `json:"add_anonymous_to_whitelist"`
CleanupMetricsDuration string `yaml:"cleanup_metrics_duration"`
Whitelist []string `yaml:"whitelist"`
Delete bool `yaml:"delete"`
AddAnonymousToWhitelist bool `json:"add_anonymous_to_whitelist"`
CleanupMetricsDuration string `yaml:"cleanup_metrics_duration"`
CleanupFutureMetricsDuration string `yaml:"cleanup_future_metrics_duration"`
}

func getDefault() config {
Expand All @@ -30,8 +31,9 @@ func getDefault() config {
DialTimeout: "500ms",
},
Cleanup: cleanupConfig{
Whitelist: []string{},
CleanupMetricsDuration: "-168h",
Whitelist: []string{},
CleanupMetricsDuration: "-168h",
CleanupFutureMetricsDuration: "60m",
},
}
}
35 changes: 25 additions & 10 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ var plotting = flag.Bool("plotting", false, "enable images in all notifications"
var removeSubscriptions = flag.String("remove-subscriptions", "", "Remove given subscriptions separated by semicolons.")

var (
cleanupUsers = flag.Bool("cleanup-users", false, "Disable/delete contacts and subscriptions of missing users")
cleanupLastChecks = flag.Bool("cleanup-last-checks", false, "Delete abandoned triggers last checks.")
cleanupTags = flag.Bool("cleanup-tags", false, "Delete abandoned tags.")
cleanupMetrics = flag.Bool("cleanup-metrics", false, "Delete outdated metrics.")
cleanupRetentions = flag.Bool("cleanup-retentions", false, "Delete abandoned retentions.")
userDel = flag.String("user-del", "", "Delete all contacts and subscriptions for a user")
fromUser = flag.String("from-user", "", "Transfer subscriptions and contacts from user.")
toUser = flag.String("to-user", "", "Transfer subscriptions and contacts to user.")
cleanupUsers = flag.Bool("cleanup-users", false, "Disable/delete contacts and subscriptions of missing users")
cleanupLastChecks = flag.Bool("cleanup-last-checks", false, "Delete abandoned triggers last checks.")
cleanupTags = flag.Bool("cleanup-tags", false, "Delete abandoned tags.")
cleanupMetrics = flag.Bool("cleanup-metrics", false, "Delete outdated metrics.")
cleanupFutureMetrics = flag.Bool("cleanup-future-metrics", false, "Delete metrics with future timestamps.")
cleanupRetentions = flag.Bool("cleanup-retentions", false, "Delete abandoned retentions.")
userDel = flag.String("user-del", "", "Delete all contacts and subscriptions for a user")
fromUser = flag.String("from-user", "", "Transfer subscriptions and contacts from user.")
toUser = flag.String("to-user", "", "Transfer subscriptions and contacts to user.")
)

var (
Expand Down Expand Up @@ -231,8 +232,8 @@ func main() { //nolint
log := logger.String(moira.LogFieldNameContext, "cleanup-metrics")

log.Info().Msg("Cleanup of outdated metrics started")
err := handleCleanUpOutdatedMetrics(confCleanup, database)
if err != nil {

if err := handleCleanUpOutdatedMetrics(confCleanup, database); err != nil {
log.Error().
Error(err).
Msg("Failed to cleanup outdated metrics")
Expand All @@ -252,6 +253,20 @@ func main() { //nolint
log.Info().Msg("Cleanup of outdated metrics finished")
}

if *cleanupFutureMetrics {
log := logger.String(moira.LogFieldNameContext, "cleanup-future-metrics")

log.Info().Msg("Cleanup of future metrics started")

if err := handleCleanUpFutureMetrics(confCleanup, database); err != nil {
log.Error().
Error(err).
Msg("Failed to cleanup future metrics")
}

log.Info().Msg("Cleanup of future metrics finished")
}

if *cleanupLastChecks {
log := logger.String(moira.LogFieldNameContext, "cleanup-last-checks")

Expand Down
14 changes: 13 additions & 1 deletion cmd/cli/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,23 @@ func handleCleanUpOutdatedMetrics(config cleanupConfig, database moira.Database)
return err
}

err = database.CleanUpOutdatedMetrics(duration)
if err = database.CleanUpOutdatedMetrics(duration); err != nil {
return err
}

return nil
}

func handleCleanUpFutureMetrics(config cleanupConfig, database moira.Database) error {
duration, err := time.ParseDuration(config.CleanupFutureMetricsDuration)
if err != nil {
return err
}

if err = database.CleanUpFutureMetrics(duration); err != nil {
return err
}

return nil
}

Expand Down
40 changes: 36 additions & 4 deletions cmd/cli/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/moira-alert/moira/database/redis"
mocks "github.com/moira-alert/moira/mock/moira-alert"

"github.com/golang/mock/gomock"
Expand All @@ -16,9 +17,40 @@ func TestCleanUpOutdatedMetrics(t *testing.T) {
defer mockCtrl.Finish()
db := mocks.NewMockDatabase(mockCtrl)

Convey("Test cleanup", t, func() {
db.EXPECT().CleanUpOutdatedMetrics(-168 * time.Hour).Return(nil)
err := handleCleanUpOutdatedMetrics(conf.Cleanup, db)
So(err, ShouldBeNil)
Convey("Test cleanup outdated metrics", t, func() {
Convey("With valid duration", func() {
db.EXPECT().CleanUpOutdatedMetrics(-168 * time.Hour).Return(nil)
err := handleCleanUpOutdatedMetrics(conf.Cleanup, db)
So(err, ShouldBeNil)
})

Convey("With invalid duration", func() {
conf.Cleanup.CleanupMetricsDuration = "168h"
db.EXPECT().CleanUpOutdatedMetrics(168 * time.Hour).Return(redis.ErrCleanUpDurationGreaterThanZero)
err := handleCleanUpOutdatedMetrics(conf.Cleanup, db)
So(err, ShouldEqual, redis.ErrCleanUpDurationGreaterThanZero)
})
})
}

func TestCleanUpFutureMetrics(t *testing.T) {
conf := getDefault()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
db := mocks.NewMockDatabase(mockCtrl)

Convey("Test cleanup future metrics", t, func() {
Convey("With valid duration", func() {
db.EXPECT().CleanUpFutureMetrics(60 * time.Minute).Return(nil)
err := handleCleanUpFutureMetrics(conf.Cleanup, db)
So(err, ShouldBeNil)
})

Convey("With invalid duration", func() {
conf.Cleanup.CleanupFutureMetricsDuration = "-60m"
db.EXPECT().CleanUpFutureMetrics(-60 * time.Minute).Return(redis.ErrCleanUpDurationLessThanZero)
err := handleCleanUpFutureMetrics(conf.Cleanup, db)
So(err, ShouldEqual, redis.ErrCleanUpDurationLessThanZero)
})
})
}
55 changes: 36 additions & 19 deletions database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"gopkg.in/tomb.v2"
)

var (
ErrCleanUpDurationLessThanZero = errors.New("clean up duration value must be greater than zero, otherwise the current metrics may be deleted")
ErrCleanUpDurationGreaterThanZero = errors.New("clean up duration value must be less than zero, otherwise all metrics will be removed")
)

func (connector *DbConnector) addPatterns(patterns ...string) error {
ctx := connector.context
client := *connector.client
Expand Down Expand Up @@ -321,15 +326,17 @@ func (connector *DbConnector) RemoveMetricRetention(metric string) error {
return nil
}

// RemoveMetricValues remove metric timestamps values from 0 to given time.
func (connector *DbConnector) RemoveMetricValues(metric string, toTime int64) (int64, error) {
// RemoveMetricValues remove values by metrics from the interval of passed parameters, if they are not in the metricsCache.
// In from and to, expect either -inf, +inf, or timestamps as strings.
func (connector *DbConnector) RemoveMetricValues(metric string, from, to string) (int64, error) {
if !connector.needRemoveMetrics(metric) {
return 0, nil
}

c := *connector.client
result, err := c.ZRemRangeByScore(connector.context, metricDataKey(metric), "-inf", strconv.FormatInt(toTime, 10)).Result()
result, err := c.ZRemRangeByScore(connector.context, metricDataKey(metric), from, to).Result()
if err != nil {
return 0, fmt.Errorf("failed to remove metrics from -inf to %v, error: %w", toTime, err)
return 0, fmt.Errorf("failed to remove metrics from %s to %s, error: %w", from, to, err)
}

return result, nil
Expand Down Expand Up @@ -359,23 +366,25 @@ func (connector *DbConnector) needRemoveMetrics(metric string) bool {
return err == nil
}

func cleanUpOutdatedMetricsOnRedisNode(connector *DbConnector, client redis.UniversalClient, duration time.Duration) error {
func cleanUpMetricsOnRedisNode(connector *DbConnector, client redis.UniversalClient, from, to string) error {
metricsIterator := client.ScanType(connector.context, 0, metricDataKey("*"), 0, "zset").Iterator()
var count int64

for metricsIterator.Next(connector.context) {
key := metricsIterator.Val()
metric := strings.TrimPrefix(key, metricDataKey(""))
deletedCount, err := flushMetric(connector, metric, duration)

deletedCount, err := connector.RemoveMetricValues(metric, from, to)
if err != nil {
return err
}

count += deletedCount
}

connector.logger.Info().
Int64("count deleted metrics", count).
Msg("Cleaned up usefully metrics for trigger")
Msg("Cleaned up metrics")

return nil
}
Expand Down Expand Up @@ -403,11 +412,29 @@ func cleanUpAbandonedRetentionsOnRedisNode(connector *DbConnector, client redis.

func (connector *DbConnector) CleanUpOutdatedMetrics(duration time.Duration) error {
if duration >= 0 {
return errors.New("clean up duration value must be less than zero, otherwise all metrics will be removed")
return ErrCleanUpDurationGreaterThanZero
}

from := "-inf"
toTs := time.Now().UTC().Add(duration).Unix()
to := strconv.FormatInt(toTs, 10)

return connector.callFunc(func(connector *DbConnector, client redis.UniversalClient) error {
return cleanUpOutdatedMetricsOnRedisNode(connector, client, duration)
return cleanUpMetricsOnRedisNode(connector, client, from, to)
})
}

func (connector *DbConnector) CleanUpFutureMetrics(duration time.Duration) error {
if duration <= 0 {
return ErrCleanUpDurationLessThanZero
}

fromTs := connector.clock.Now().Add(duration).Unix()
from := strconv.FormatInt(fromTs, 10)
to := "+inf"

return connector.callFunc(func(connector *DbConnector, client redis.UniversalClient) error {
return cleanUpMetricsOnRedisNode(connector, client, from, to)
})
}

Expand Down Expand Up @@ -564,16 +591,6 @@ func (connector *DbConnector) RemoveAllMetrics() error {
return connector.callFunc(removeAllMetricsOnRedisNode)
}

func flushMetric(database moira.Database, metric string, duration time.Duration) (int64, error) {
lastTs := time.Now().UTC()
toTs := lastTs.Add(duration).Unix()
deletedCount, err := database.RemoveMetricValues(metric, toTs)
if err != nil {
return deletedCount, err
}
return deletedCount, nil
}

var patternsListKey = "moira-pattern-list"

var metricEventsChannels = []string{
Expand Down
Loading

0 comments on commit ce13490

Please sign in to comment.