forked from sieve-project/sieve
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstate_machine.go
120 lines (111 loc) · 3.57 KB
/
state_machine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main
import (
"log"
"time"
)
type StateMachine struct {
states []Action
nextState int
stateNotificationCh chan TriggerNotification
timeoutNotificationCh chan TriggerNotification
asyncDoneCh chan *AsyncDoneNotification
asyncActionInExecution bool
actionConext *ActionContext
}
func NewStateMachine(testPlan *TestPlan, stateNotificationCh chan TriggerNotification, asyncDoneCh chan *AsyncDoneNotification, actionContext *ActionContext) *StateMachine {
return &StateMachine{
states: testPlan.actions,
nextState: 0,
stateNotificationCh: stateNotificationCh,
timeoutNotificationCh: make(chan TriggerNotification, 500),
asyncDoneCh: asyncDoneCh,
asyncActionInExecution: false,
actionConext: actionContext,
}
}
func (sm *StateMachine) waitForTimeout(timeoutValue int, triggerName string) {
time.Sleep(time.Duration(timeoutValue) * time.Second)
sm.timeoutNotificationCh <- &TimeoutNotification{
conditionName: triggerName,
}
}
func (sm *StateMachine) setTimeoutForTimeoutTriggers() {
triggerGraph := sm.states[sm.nextState].getTriggerGraph()
triggerDefinitions := sm.states[sm.nextState].getTriggerDefinitions()
for triggerName := range triggerGraph.toSatisfy {
if timeoutTrigger, ok := triggerDefinitions[triggerName].(*TimeoutTrigger); ok {
go sm.waitForTimeout(timeoutTrigger.timeoutValue, timeoutTrigger.getTriggerName())
}
}
}
func (sm *StateMachine) processNotification(notification TriggerNotification) {
defer func() {
if blockingCh := notification.getBlockingCh(); blockingCh != nil {
log.Println("release the blocking ch")
blockingCh <- "release"
}
}()
if sm.states == nil {
return
}
if sm.nextState >= len(sm.states) {
// all the actions are finished
return
}
if sm.asyncActionInExecution {
// do not process triggers before the previous async action gets finished
return
}
action := sm.states[sm.nextState]
triggerGraph := sm.states[sm.nextState].getTriggerGraph()
triggerDefinitions := sm.states[sm.nextState].getTriggerDefinitions()
for triggerName := range triggerGraph.toSatisfy {
triggerDefinition, foundTrigger := triggerDefinitions[triggerName]
if !foundTrigger {
log.Printf("trigger %s is not in the definition table; skip it\n", triggerName)
continue
}
if triggerDefinition.satisfy(notification) {
triggerGraph.trigger(triggerName)
log.Printf("trigger %s is satisfied\n", triggerName)
if triggerGraph.fullyTriggered() {
log.Printf("all triggers are satisfied for action %d\n", sm.nextState)
action.run(sm.actionConext)
if !action.isAsync() {
sm.nextState += 1
if sm.nextState >= len(sm.states) {
log.Println("Sieve test coordinator finishes all actions")
} else {
sm.setTimeoutForTimeoutTriggers()
}
} else {
sm.asyncActionInExecution = true
}
break
} else {
sm.setTimeoutForTimeoutTriggers()
}
}
}
}
func (sm *StateMachine) processAsyncDone(notification *AsyncDoneNotification) {
sm.nextState += 1
sm.asyncActionInExecution = false
if sm.nextState >= len(sm.states) {
log.Println("Sieve test coordinator finishes all actions")
} else {
sm.setTimeoutForTimeoutTriggers()
}
}
func (sm *StateMachine) run() {
for {
select {
case stateNotification := <-sm.stateNotificationCh:
sm.processNotification(stateNotification)
case timeoutNotification := <-sm.timeoutNotificationCh:
sm.processNotification(timeoutNotification)
case asyncDoneNotification := <-sm.asyncDoneCh:
sm.processAsyncDone(asyncDoneNotification)
}
}
}