Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filter): Move batch forced save timeout to config #1014

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions cmd/filter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type filterConfig struct {
DropMetricsTTL string `yaml:"drop_metrics_ttl"`
// Flags for compatibility with different graphite behaviours
Compatibility compatibility `yaml:"graphite_compatibility"`
// Time after which the batch of metrics is forced to be saved, default is 1s
BatchForcedSaveTimeout string `yaml:"batch_forced_save_timeout"`
}

func getDefault() config {
Expand All @@ -45,12 +47,13 @@ func getDefault() config {
LogPrettyFormat: false,
},
Filter: filterConfig{
Listen: ":2003",
RetentionConfig: "/etc/moira/storage-schemas.conf",
CacheCapacity: 10, //nolint
MaxParallelMatches: 0,
PatternsUpdatePeriod: "1s",
DropMetricsTTL: "1h",
Listen: ":2003",
RetentionConfig: "/etc/moira/storage-schemas.conf",
CacheCapacity: 10, //nolint
MaxParallelMatches: 0,
PatternsUpdatePeriod: "1s",
DropMetricsTTL: "1h",
BatchForcedSaveTimeout: "1s",
Compatibility: compatibility{
AllowRegexLooseStartMatch: false,
AllowRegexMatchEmpty: true,
Expand Down
3 changes: 2 additions & 1 deletion cmd/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func main() {

// Start metrics matcher
cacheCapacity := config.Filter.CacheCapacity
metricsMatcher := matchedmetrics.NewMetricsMatcher(filterMetrics, logger, database, cacheStorage, cacheCapacity)
batchForcedSaveTimeout := to.Duration(config.Filter.BatchForcedSaveTimeout)
almostinf marked this conversation as resolved.
Show resolved Hide resolved
metricsMatcher := matchedmetrics.NewMetricsMatcher(filterMetrics, logger, database, cacheStorage, cacheCapacity, batchForcedSaveTimeout)
metricsMatcher.Start(metricsChan)
defer metricsMatcher.Wait() // First stop listener
defer stopListener(listener) // Then waiting for metrics matcher handle all received events
Expand Down
33 changes: 18 additions & 15 deletions filter/matched_metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (

// MetricsMatcher make buffer of metrics and save it.
type MetricsMatcher struct {
logger moira.Logger
metrics *metrics.FilterMetrics
database moira.Database
cacheStorage *filter.Storage
cacheCapacity int
waitGroup *sync.WaitGroup
closeRequest chan struct{}
logger moira.Logger
metrics *metrics.FilterMetrics
database moira.Database
cacheStorage *filter.Storage
cacheCapacity int
waitGroup *sync.WaitGroup
closeRequest chan struct{}
batchForcedSaveTimeout time.Duration
}

// NewMetricsMatcher creates new MetricsMatcher.
Expand All @@ -28,15 +29,17 @@ func NewMetricsMatcher(
database moira.Database,
cacheStorage *filter.Storage,
cacheCapacity int,
batchForcedSaveTimeout time.Duration,
) *MetricsMatcher {
return &MetricsMatcher{
metrics: metrics,
logger: logger,
database: database,
cacheStorage: cacheStorage,
cacheCapacity: cacheCapacity,
waitGroup: &sync.WaitGroup{},
closeRequest: make(chan struct{}),
metrics: metrics,
logger: logger,
database: database,
cacheStorage: cacheStorage,
cacheCapacity: cacheCapacity,
waitGroup: &sync.WaitGroup{},
closeRequest: make(chan struct{}),
batchForcedSaveTimeout: batchForcedSaveTimeout,
}
}

Expand All @@ -63,7 +66,7 @@ func (matcher *MetricsMatcher) receiveBatch(metrics <-chan *moira.MatchedMetric)

go func() {
defer close(batchedMetrics)
batchTimer := time.NewTimer(time.Second)
batchTimer := time.NewTimer(matcher.batchForcedSaveTimeout)
defer batchTimer.Stop()
for {
batch := make(map[string]*moira.MatchedMetric, matcher.cacheCapacity)
Expand Down
Loading