-
Notifications
You must be signed in to change notification settings - Fork 490
/
Copy pathedge.go
66 lines (55 loc) · 1.3 KB
/
edge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package kapacitor
import (
"errors"
"sync"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/server/vars"
)
const (
statCollected = "collected"
statEmitted = "emitted"
defaultEdgeBufferSize = 1000
)
var ErrAborted = errors.New("edged aborted")
type EdgeDiagnostic interface {
ClosingEdge(collected, emitted int64)
}
type Edge struct {
edge.StatsEdge
mu sync.Mutex
closed bool
statsKey string
statMap *expvar.Map
diag EdgeDiagnostic
}
func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, size int, d EdgeDiagnostic) edge.StatsEdge {
e := edge.NewStatsEdge(edge.NewChannelEdge(t, defaultEdgeBufferSize))
tags := map[string]string{
"task": taskName,
"parent": parentName,
"child": childName,
"type": t.String(),
}
key, sm := vars.NewStatistic("edges", tags)
sm.Set(statCollected, e.CollectedVar())
sm.Set(statEmitted, e.EmittedVar())
return &Edge{
StatsEdge: e,
statsKey: key,
statMap: sm,
diag: d,
}
}
func (e *Edge) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil
}
e.closed = true
vars.DeleteStatistic(e.statsKey)
e.diag.ClosingEdge(e.Collected(), e.Emitted())
return e.StatsEdge.Close()
}