Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge pull request #13 from tie/fix-ticker-stop
Browse files Browse the repository at this point in the history
fix: do not tick after stop
  • Loading branch information
ernado authored Dec 27, 2021
2 parents 7efdd3c + ac80857 commit c20eb63
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 60 deletions.
2 changes: 2 additions & 0 deletions net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func TestNetPing(t *testing.T) {
}

func TestNetPingDeadline(t *testing.T) {
t.Skip("skipping flaky test, see https://github.com/gotd/neo/pull/13#issuecomment-1001285136")

nt := &Net{
peers: make(map[string]*PacketConn),
}
Expand Down
24 changes: 19 additions & 5 deletions ticker.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
package neo

import (
"sync/atomic"
"time"
)

type ticker struct {
time *Time
ch chan time.Time
id int
dur int64
dur time.Duration
}

func (t *ticker) C() <-chan time.Time {
return t.ch
}

func (t *ticker) Stop() {
t.time.stopTimer(t.id)
t.time.stop(t.id)
}

func (t *ticker) Reset(d time.Duration) {
atomic.StoreInt64(&t.dur, int64(d))
t.time.resetTimer(d, t.id, t.ch)
t.time.reset(d, t.id, t.do, &t.dur)
}

// do is the ticker’s moment callback. It sends the now time to the underlying
// channel and plans a new moment for the next tick. Note that do runs under
// Time’s lock.
func (t *ticker) do(now time.Time) {
t.ch <- now

// It is safe to mutate ID without a lock since at most one moment
// exists for the given ticker and moments run under the Time’s lock.
t.time.resetUnlocked(t.dur, t.id, t.do, nil)

// Ticker used to create a new moment for each tick and that would close
// the observe channel. Maintain backwards compatibility for users that
// may rely on this behavior.
t.time.observeUnlocked()
}
98 changes: 48 additions & 50 deletions time.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package neo
import (
"sort"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -33,6 +32,8 @@ func NewTime(now time.Time) *Time {
//
// All methods are goroutine-safe.
type Time struct {
// mux guards internal state. Note that all methods without Unlocked
// suffix acquire mux.
mux sync.Mutex
now time.Time
momentID int
Expand All @@ -42,36 +43,22 @@ type Time struct {
}

func (t *Time) Timer(d time.Duration) Timer {
done := make(chan time.Time, 1)

return timer{
tt := &timer{
time: t,
ch: done,
id: t.plan(t.When(d), func(now time.Time) {
done <- now
}),
ch: make(chan time.Time, 1),
}
tt.id = t.plan(t.When(d), tt.do)
return tt
}

func (t *Time) Ticker(d time.Duration) Ticker {
done := make(chan time.Time, 1)

tick := &ticker{
tt := &ticker{
time: t,
ch: done,
dur: int64(d),
}

var cb func(now time.Time)
cb = func(now time.Time) {
done <- now

dur := time.Duration(atomic.LoadInt64(&tick.dur))
t.planUnlocked(now.Add(dur), cb)
ch: make(chan time.Time, 1),
dur: d,
}
tick.id = t.plan(t.When(d), cb)

return tick
tt.id = t.plan(t.When(d), tt.do)
return tt
}

func (t *Time) planUnlocked(when time.Time, do func(now time.Time)) int {
Expand All @@ -81,7 +68,7 @@ func (t *Time) planUnlocked(when time.Time, do func(now time.Time)) int {
when: when,
do: do,
}
t.observe()
t.observeUnlocked()
return id
}

Expand All @@ -92,7 +79,9 @@ func (t *Time) plan(when time.Time, do func(now time.Time)) int {
return t.planUnlocked(when, do)
}

func (t *Time) stopTimer(id int) bool {
// stop removes the moment with the given ID from the list of scheduled moments.
// It returns true if a moment existed for the given ID, otherwise it is no-op.
func (t *Time) stop(id int) bool {
t.mux.Lock()
defer t.mux.Unlock()

Expand All @@ -101,27 +90,32 @@ func (t *Time) stopTimer(id int) bool {
return ok
}

func (t *Time) resetTimer(d time.Duration, id int, ch chan time.Time) {
// reset adjusts the moment with the given ID to run after the d duration. It
// creates a new moment if the moment does not already exist. If durp pointer
// is not nil, it is updated with d value while reset is holding Time’s lock.
func (t *Time) reset(d time.Duration, id int, do func(now time.Time), durp *time.Duration) {
t.mux.Lock()
defer t.mux.Unlock()
t.resetUnlocked(d, id, do, durp)
}

// resetUnlocked is like reset but does not acquire the Time’s lock.
func (t *Time) resetUnlocked(d time.Duration, id int, do func(now time.Time), durp *time.Duration) {
if durp != nil {
*durp = d
}

m, ok := t.moments[id]
if !ok {
m = moment{
do: func(now time.Time) {
ch <- now
},
}
m = moment{do: do}
}

m.when = t.now.Add(d)
t.moments[id] = m
}

// tick applies all scheduled temporal effects.
//
// The mux lock is expected.
func (t *Time) tick() moments {
// tickUnlocked applies all scheduled temporal effects.
func (t *Time) tickUnlocked() moments {
var past moments

for id, m := range t.moments {
Expand All @@ -139,30 +133,27 @@ func (t *Time) tick() moments {
// Now returns the current time.
func (t *Time) Now() time.Time {
t.mux.Lock()
now := t.now
t.mux.Unlock()
return now
defer t.mux.Unlock()
return t.now
}

// Set travels to specified time.
//
// Also triggers temporal effects.
func (t *Time) Set(now time.Time) {
t.mux.Lock()
t.now = now
t.mux.Unlock()
t.tick().do(now)
defer t.mux.Unlock()
t.setUnlocked(now)
}

// Travel adds duration to current time and returns result.
//
// Also triggers temporal effects.
func (t *Time) Travel(d time.Duration) time.Time {
t.mux.Lock()
defer t.mux.Unlock()
now := t.now.Add(d)
t.now = now
t.tick().do(now)
t.mux.Unlock()
t.setUnlocked(now)
return now
}

Expand All @@ -171,13 +162,19 @@ func (t *Time) Travel(d time.Duration) time.Time {
// Also triggers temporal effects.
func (t *Time) TravelDate(years, months, days int) time.Time {
t.mux.Lock()
defer t.mux.Unlock()
now := t.now.AddDate(years, months, days)
t.now = now
t.tick().do(now)
t.mux.Unlock()
t.setUnlocked(now)
return now
}

// setUnlocked sets the current time to the given now time and triggers temporal
// effects.
func (t *Time) setUnlocked(now time.Time) {
t.now = now
t.tickUnlocked().do(now)
}

// Sleep blocks until duration is elapsed.
func (t *Time) Sleep(d time.Duration) { <-t.After(d) }

Expand All @@ -196,7 +193,8 @@ func (t *Time) After(d time.Duration) <-chan time.Time {
return done
}

// Observe return channel that closes on clock calls.
// Observe return channel that closes on clock calls. The current implementation
// also closes the channel on Ticker’s ticks.
func (t *Time) Observe() <-chan struct{} {
observer := make(chan struct{})
t.mux.Lock()
Expand All @@ -206,7 +204,7 @@ func (t *Time) Observe() <-chan struct{} {
return observer
}

func (t *Time) observe() {
func (t *Time) observeUnlocked() {
for _, observer := range t.observers {
close(observer)
}
Expand Down
125 changes: 125 additions & 0 deletions time_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package neo

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -171,3 +172,127 @@ func TestTime_Ticker(t *testing.T) {
}
}
}

func TestTime_TickerStop(t *testing.T) {
const interval = time.Second

now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

ticker := sim.Ticker(interval)
defer ticker.Stop()

// Tick once and stop ticker.
sim.Travel(interval)
select {
case <-ticker.C():
default:
t.Error("unexpected state")
}
ticker.Stop()

// Advance time by the tick interval and check that the tick was not
// sent on the channel.
sim.Travel(interval)
select {
case <-ticker.C():
t.Error("unexpected state")
default:
}

// Check that we can reset the ticker after stopping it and there are no
// erroneous ticks.
ticker.Reset(interval)
for range [3]struct{}{} {
select {
case <-ticker.C():
t.Error("unexpected done")
default:
}

sim.Travel(interval)

select {
case <-ticker.C():
default:
t.Error("unexpected state")
}
}
}

func TestTime_ObserveTick(t *testing.T) {
const interval = time.Second

now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

ticker := sim.Ticker(interval)
defer ticker.Stop()

// Check that we do not break existing users of the Time implementation:
// the observe channel must be closed on each tick.
for range [3]struct{}{} {
observe := sim.Observe()
sim.Travel(interval)
select {
case <-ticker.C():
default:
t.Error("unexpected state")
}
select {
case <-observe:
default:
t.Error("missing observation on tick")
}
}
}

func TestTime_Sleep(t *testing.T) {
const interval = time.Second

now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

observe := sim.Observe()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
sim.Sleep(interval)
}()
<-observe
sim.Travel(interval)
wg.Wait()
}

func TestTime_TravelSteps(t *testing.T) {
const (
step = time.Second
n = 5
)

now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

timer := sim.Timer(n * step)
defer timer.Stop()

// Make all steps except the last one.
for i := 1; i < n; i++ {
sim.Travel(step)
select {
case <-timer.C():
t.Fatal("unexpected state")
default:
}
}

// Make the last step.
sim.Travel(step)
select {
case <-timer.C():
default:
t.Fatal("unexpected state")
}
}
Loading

0 comments on commit c20eb63

Please sign in to comment.