Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
almostinf committed Apr 23, 2024
1 parent f88cf64 commit 2e3e07a
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 14 deletions.
61 changes: 48 additions & 13 deletions database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,20 +426,14 @@ func (connector *DbConnector) CleanupOutdatedPatternMetrics() (int64, error) {
pipe := client.TxPipeline()

for _, pattern := range patterns {
patternMetricsIterator := client.SScan(ctx, patternMetricsKey(pattern), 0, "*", 0).Iterator()

for patternMetricsIterator.Next(ctx) {
metric := patternMetricsIterator.Val()

res, err := client.Exists(ctx, metricDataKey(metric)).Result()
if err != nil {
return count, fmt.Errorf("failed to check metric on existence: %w", err)
}
nonExistentMetrics, err := connector.getNonExistentPatternMetrics(pattern)
if err != nil {
return count, fmt.Errorf("failed to get non existent metrics by pattern: %w", err)
}

if res == 0 {
pipe.SRem(ctx, patternMetricsKey(pattern), metric)
count++
}
for _, metric := range nonExistentMetrics {
pipe.SRem(ctx, patternMetricsKey(pattern), metric)
count++
}
}

Expand All @@ -450,6 +444,47 @@ func (connector *DbConnector) CleanupOutdatedPatternMetrics() (int64, error) {
return count, nil
}

func (connector *DbConnector) getNonExistentPatternMetrics(pattern string) ([]string, error) {
ctx := connector.context
client := *connector.client

metrics, err := connector.GetPatternMetrics(pattern)
if err != nil {
return nil, fmt.Errorf("failed to get pattern metrics: %w", err)
}

pipe := client.TxPipeline()

for _, metric := range metrics {
pipe.Exists(ctx, metricDataKey(metric))
}

exec, err := pipe.Exec(ctx)
if err != nil {
return nil, fmt.Errorf("failed to Exec Exists metric by pattern: %w", err)
}

nonExistentMetrics := make([]string, 0)

for i, cmder := range exec {
cmd, ok := cmder.(*redis.IntCmd)
if !ok {
return nil, fmt.Errorf("failed to convert cmder to intcmd result: %w", err)
}

res, err := cmd.Result()
if err != nil {
return nil, err
}

if res == 0 {
nonExistentMetrics = append(nonExistentMetrics, metrics[i])
}
}

return nonExistentMetrics, nil
}

// CleanUpAbandonedRetentions removes metric retention keys that have no corresponding metric data.
func (connector *DbConnector) CleanUpAbandonedRetentions() error {
return connector.callFunc(cleanUpAbandonedRetentionsOnRedisNode)
Expand Down
131 changes: 131 additions & 0 deletions database/redis/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,137 @@ func TestCleanupOutdatedPatternMetrics(t *testing.T) {
})
}

func TestGetNonExistentPatternMetrics(t *testing.T) {
logger, _ := logging.ConfigureLog("stdout", "warn", "test", true)
dataBase := NewTestDatabase(logger)
dataBase.Flush()
defer dataBase.Flush()

const (
pattern1 = "pattern1"
pattern2 = "pattern2"
pattern3 = "pattern3"

metric1 = "metric1"
metric2 = "metric2"
metric3 = "metric3"
metric4 = "metric4"
metric5 = "metric5"
)

Convey("Test get non existent pattern metrics", t, func() {
Convey("Without non existent metrics", func() {
defer func() {
err := dataBase.RemovePatternWithMetrics(pattern1)
So(err, ShouldBeNil)

err = dataBase.RemovePatternWithMetrics(pattern2)
So(err, ShouldBeNil)

err = dataBase.RemovePatternWithMetrics(pattern3)
So(err, ShouldBeNil)
}()

matchedMetric1 := &moira.MatchedMetric{
Patterns: []string{pattern1},
Metric: metric1,
}
matchedMetric2 := &moira.MatchedMetric{
Patterns: []string{pattern1},
Metric: metric2,
}
matchedMetric3 := &moira.MatchedMetric{
Patterns: []string{pattern2},
Metric: metric3,
}
matchedMetric4 := &moira.MatchedMetric{
Patterns: []string{pattern2},
Metric: metric4,
}
matchedMetric5 := &moira.MatchedMetric{
Patterns: []string{pattern3},
Metric: metric5,
}

err := dataBase.addPatterns(pattern1, pattern2, pattern3)
So(err, ShouldBeNil)

err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{
metric1: matchedMetric1,
metric2: matchedMetric2,
metric3: matchedMetric3,
metric4: matchedMetric4,
metric5: matchedMetric5,
})
So(err, ShouldBeNil)

pattern1Metrics, err := dataBase.getNonExistentPatternMetrics(pattern1)
So(err, ShouldBeNil)
assert.ElementsMatch(t, pattern1Metrics, []string{})

pattern2Metrics, err := dataBase.getNonExistentPatternMetrics(pattern2)
So(err, ShouldBeNil)
assert.ElementsMatch(t, pattern2Metrics, []string{})

pattern3Metrics, err := dataBase.getNonExistentPatternMetrics(pattern3)
So(err, ShouldBeNil)
assert.ElementsMatch(t, pattern3Metrics, []string{})
})

Convey("With outdated metrics", func() {
defer func() {
err := dataBase.RemovePatternWithMetrics(pattern1)
So(err, ShouldBeNil)

err = dataBase.RemovePatternWithMetrics(pattern2)
So(err, ShouldBeNil)

err = dataBase.RemovePatternWithMetrics(pattern3)
So(err, ShouldBeNil)
}()

matchedMetric1 := &moira.MatchedMetric{
Patterns: []string{pattern1},
Metric: metric1,
}
matchedMetric3 := &moira.MatchedMetric{
Patterns: []string{pattern2},
Metric: metric3,
}

err := dataBase.addPatterns(pattern1, pattern2, pattern3)
So(err, ShouldBeNil)

err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{
metric1: matchedMetric1,
metric3: matchedMetric3,
})
So(err, ShouldBeNil)

err = dataBase.AddPatternMetric(pattern1, metric2)
So(err, ShouldBeNil)

err = dataBase.AddPatternMetric(pattern2, metric4)
So(err, ShouldBeNil)

err = dataBase.AddPatternMetric(pattern3, metric5)
So(err, ShouldBeNil)

pattern1Metrics, err := dataBase.getNonExistentPatternMetrics(pattern1)
So(err, ShouldBeNil)
assert.ElementsMatch(t, pattern1Metrics, []string{metric2})

pattern2Metrics, err := dataBase.getNonExistentPatternMetrics(pattern2)
So(err, ShouldBeNil)
assert.ElementsMatch(t, pattern2Metrics, []string{metric4})

pattern3Metrics, err := dataBase.getNonExistentPatternMetrics(pattern3)
So(err, ShouldBeNil)
assert.ElementsMatch(t, pattern3Metrics, []string{metric5})
})
})
}

func TestCleanupAbandonedRetention(t *testing.T) {
logger, _ := logging.ConfigureLog("stdout", "warn", "test", true)
dataBase := NewTestDatabase(logger)
Expand Down
2 changes: 1 addition & 1 deletion database/redis/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (connector *DbConnector) fetchNotificationsDo(to int64, limit int64) ([]*mo
var affected int
affected, err = connector.resaveNotifications(ctx, pipe, types.ToResaveOld, types.ToResaveNew)
if err != nil {
return fmt.Errorf("failed to resave notifications in transaction")
return fmt.Errorf("failed to resave notifications in transaction: %w", err)
}

if affected != len(types.ToResaveOld)+len(types.ToResaveNew) {
Expand Down

0 comments on commit 2e3e07a

Please sign in to comment.