Skip to content

Commit

Permalink
ITimer abstracts out a real Timer (#518)
Browse files Browse the repository at this point in the history
This PR adds ITimer abstraction and makes all timer-dependent tests use it to improve their stability and performance.

Fixes #508
  • Loading branch information
alnikola authored Nov 5, 2020
1 parent bb6978d commit aa87462
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 200 deletions.
11 changes: 7 additions & 4 deletions src/ReverseProxy/Service/Management/EntityActionScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void ChangePeriod(T entity, TimeSpan newPeriod)
{
if (_entries.TryGetValue(entity, out var entry))
{
entry.ChangePeriod((long)newPeriod.TotalMilliseconds);
entry.ChangePeriod((long)newPeriod.TotalMilliseconds, Volatile.Read(ref _isStarted) == 1);
}
}

Expand Down Expand Up @@ -119,12 +119,15 @@ public SchedulerEntry(T entity, long period, TimerCallback timerCallback, bool a

public long Period => _period;

public Timer Timer { get; }
public ITimer Timer { get; }

public void ChangePeriod(long newPeriod)
public void ChangePeriod(long newPeriod, bool resetTimer)
{
Interlocked.Exchange(ref _period, newPeriod);
SetTimer();
if (resetTimer)
{
SetTimer();
}
}

public void SetTimer()
Expand Down
12 changes: 12 additions & 0 deletions src/ReverseProxy/Utilities/ITimer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;

namespace Microsoft.ReverseProxy.Utilities
{
internal interface ITimer : IDisposable
{
void Change(long dueTime, long period);
}
}
3 changes: 1 addition & 2 deletions src/ReverseProxy/Utilities/ITimerFactory.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Threading;

namespace Microsoft.ReverseProxy.Utilities
{
internal interface ITimerFactory
{
Timer CreateTimer(TimerCallback callback, object state, long dueTime, long period);
ITimer CreateTimer(TimerCallback callback, object state, long dueTime, long period);
}
}
4 changes: 2 additions & 2 deletions src/ReverseProxy/Utilities/TimerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace Microsoft.ReverseProxy.Utilities
{
internal class TimerFactory : ITimerFactory
{
public Timer CreateTimer(TimerCallback callback, object state, long dueTime, long period)
public ITimer CreateTimer(TimerCallback callback, object state, long dueTime, long period)
{
return new Timer(callback, state, dueTime, period);
return new TimerWrapper(callback, state, dueTime, period);
}
}
}
27 changes: 27 additions & 0 deletions src/ReverseProxy/Utilities/TimerWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Threading;

namespace Microsoft.ReverseProxy.Utilities
{
internal class TimerWrapper : ITimer
{
private readonly Timer _realTimer;

public TimerWrapper(TimerCallback callback, object state, long dueTime, long period)
{
_realTimer = new Timer(callback, state, dueTime, period);
}

public void Change(long dueTime, long period)
{
_realTimer.Change(dueTime, period);
}

public void Dispose()
{
_realTimer.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ namespace Microsoft.ReverseProxy.Service.HealthChecks
{
public class ActiveHealthCheckMonitorTests
{
private const long Interval0 = 10000;
private const long Interval1 = 20000;

[Fact]
public async Task CheckHealthAsync_ActiveHealthCheckIsEnabledForCluster_SendProbe()
{
Expand Down Expand Up @@ -63,17 +66,19 @@ public async Task ProbeCluster_ProbingTimerFired_SendProbesAndReceiveResponses()
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy0.Object, policy1.Object }, new DefaultProbingRequestFactory(), timerFactory, GetLogger());

var httpClient0 = GetHttpClient();
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromSeconds(10));
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, TimeSpan.FromMilliseconds(Interval0));
monitor.OnClusterAdded(cluster0);
var httpClient2 = GetHttpClient();
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromSeconds(20));
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, TimeSpan.FromMilliseconds(Interval1));
monitor.OnClusterAdded(cluster2);

await monitor.CheckHealthAsync(new ClusterInfo[0]);

timerFactory.FireAndWaitAll();
timerFactory.FireAll();

Assert.Equal(2, timerFactory.Count);
timerFactory.VerifyTimer(0, Interval0);
timerFactory.VerifyTimer(1, Interval1);
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 1), ("https://localhost:20001/cluster0/api/health/", 1) }, policyCallTimes: 1);
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) }, policyCallTimes: 1);
}
Expand All @@ -90,26 +95,27 @@ public async Task ProbeCluster_ClusterRemoved_StopSendingProbes()
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy0.Object, policy1.Object }, new DefaultProbingRequestFactory(), timerFactory, GetLogger());

var httpClient0 = GetHttpClient();
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromSeconds(10));
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromMilliseconds(Interval0));
monitor.OnClusterAdded(cluster0);
var httpClient2 = GetHttpClient();
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromSeconds(20));
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromMilliseconds(Interval1));
monitor.OnClusterAdded(cluster2);

await monitor.CheckHealthAsync(new ClusterInfo[0]);

timerFactory.FireAndWaitAll();
timerFactory.FireAll();

Assert.Equal(2, timerFactory.Count);
timerFactory.VerifyTimer(0, Interval0);
timerFactory.VerifyTimer(1, Interval1);
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 1), ("https://localhost:20001/cluster0/api/health/", 1) }, policyCallTimes: 1);
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) }, policyCallTimes: 1);

monitor.OnClusterRemoved(cluster2);

timerFactory.FireTimer(0);
timerFactory.WaitOnCallback(0);

Assert.Throws<ObjectDisposedException>(() => timerFactory.FireTimer(1));
timerFactory.AssertTimerDisposed(1);

VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 2), ("https://localhost:20001/cluster0/api/health/", 2) }, policyCallTimes: 2);
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) }, policyCallTimes: 1);
Expand All @@ -127,24 +133,26 @@ public async Task ProbeCluster_ClusterAdded_StartSendingProbes()
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy0.Object, policy1.Object }, new DefaultProbingRequestFactory(), timerFactory, GetLogger());

var httpClient0 = GetHttpClient();
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromSeconds(10));
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromMilliseconds(Interval0));
monitor.OnClusterAdded(cluster0);

await monitor.CheckHealthAsync(new ClusterInfo[0]);

timerFactory.FireAndWaitAll();
timerFactory.FireAll();

Assert.Equal(1, timerFactory.Count);
timerFactory.VerifyTimer(0, Interval0);
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 1), ("https://localhost:20001/cluster0/api/health/", 1) }, policyCallTimes: 1);

var httpClient2 = GetHttpClient();
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromSeconds(20));
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromMilliseconds(Interval1));
monitor.OnClusterAdded(cluster2);

timerFactory.FireAndWaitAll();
timerFactory.FireAll();

Assert.Equal(2, timerFactory.Count);
timerFactory.VerifyTimer(1, 20000);
timerFactory.VerifyTimer(0, Interval0);
timerFactory.VerifyTimer(1, Interval1);
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 2), ("https://localhost:20001/cluster0/api/health/", 2) }, policyCallTimes: 2);
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) }, policyCallTimes: 1);
}
Expand All @@ -161,16 +169,19 @@ public async Task ProbeCluster_ClusterChanged_SendProbesToNewHealthEndpoint()
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy0.Object, policy1.Object }, new DefaultProbingRequestFactory(), timerFactory, GetLogger());

var httpClient0 = GetHttpClient();
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromSeconds(10));
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromMilliseconds(Interval0));
monitor.OnClusterAdded(cluster0);
var httpClient2 = GetHttpClient();
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromSeconds(20));
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromMilliseconds(Interval1));
monitor.OnClusterAdded(cluster2);

await monitor.CheckHealthAsync(new ClusterInfo[0]);

timerFactory.FireAndWaitAll();
timerFactory.FireAll();

Assert.Equal(2, timerFactory.Count);
timerFactory.VerifyTimer(0, Interval0);
timerFactory.VerifyTimer(1, Interval1);
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 1), ("https://localhost:20001/cluster0/api/health/", 1) }, policyCallTimes: 1);
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) }, policyCallTimes: 1);

Expand All @@ -185,8 +196,11 @@ public async Task ProbeCluster_ClusterChanged_SendProbesToNewHealthEndpoint()

monitor.OnClusterChanged(cluster2);

timerFactory.FireAndWaitAll();
timerFactory.FireAll();

Assert.Equal(2, timerFactory.Count);
timerFactory.VerifyTimer(0, Interval0);
timerFactory.VerifyTimer(1, Interval1);
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 2), ("https://localhost:20001/cluster0/api/health/", 2) }, policyCallTimes: 2);
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:10000/cluster2/api/health/", 1), ("https://localhost:10001/cluster2/api/health/", 1) }, policyCallTimes: 2);
}
Expand All @@ -203,15 +217,15 @@ public async Task ProbeCluster_ClusterChanged_StopSendingProbes()
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy0.Object, policy1.Object }, new DefaultProbingRequestFactory(), timerFactory, GetLogger());

var httpClient0 = GetHttpClient();
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromSeconds(10));
var cluster0 = GetClusterInfo("cluster0", "policy0", true, httpClient0.Object, interval: TimeSpan.FromMilliseconds(Interval0));
monitor.OnClusterAdded(cluster0);
var httpClient2 = GetHttpClient();
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromSeconds(20));
var cluster2 = GetClusterInfo("cluster2", "policy1", true, httpClient2.Object, interval: TimeSpan.FromMilliseconds(Interval1));
monitor.OnClusterAdded(cluster2);

await monitor.CheckHealthAsync(new ClusterInfo[0]);

timerFactory.FireAndWaitAll();
timerFactory.FireAll();

VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 1), ("https://localhost:20001/cluster0/api/health/", 1) }, policyCallTimes: 1);
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) }, policyCallTimes: 1);
Expand All @@ -224,9 +238,8 @@ public async Task ProbeCluster_ClusterChanged_StopSendingProbes()
monitor.OnClusterChanged(cluster2);

timerFactory.FireTimer(0);
timerFactory.WaitOnCallback(0);

Assert.Throws<ObjectDisposedException>(() => timerFactory.FireTimer(1));
timerFactory.AssertTimerDisposed(1);
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 2), ("https://localhost:20001/cluster0/api/health/", 2) }, policyCallTimes: 2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ public void Schedule_ReactivationPeriodElapsed_SetPassiveHealthToUnknown()

var reactivationPeriod = TimeSpan.FromSeconds(2);
scheduler.Schedule(destination, reactivationPeriod);
timerFactory.VerifyTimer(0, 2000);

timerFactory.FireAndWaitAll();
timerFactory.FireAll();

timerFactory.VerifyTimer(0, 2000);
Assert.Equal(DestinationHealth.Unhealthy, destination.Health.Active);
Assert.Equal(DestinationHealth.Unknown, destination.Health.Passive);
timerFactory.AssertTimerDisposed(0);
}

[Fact]
public void Schedule_ReactivationPeriodElapsedTwice_ReactivateDestinationOnlyOnce()
public void Schedule_DestinationIsAlreadyHealthy_DoNothing()
{
var destination = new DestinationInfo("destination0");
destination.Health.Active = DestinationHealth.Unhealthy;
Expand All @@ -46,15 +47,14 @@ public void Schedule_ReactivationPeriodElapsedTwice_ReactivateDestinationOnlyOnc
Assert.Equal(DestinationHealth.Unhealthy, destination.Health.Active);
Assert.Equal(DestinationHealth.Unhealthy, destination.Health.Passive);

var reactivationPeriod = TimeSpan.FromSeconds(2);
scheduler.Schedule(destination, reactivationPeriod);
scheduler.Schedule(destination, TimeSpan.FromSeconds(2));

timerFactory.FireAndWaitAll();
destination.Health.Passive = DestinationHealth.Healthy;

timerFactory.VerifyTimer(0, 2000);
Assert.Equal(1, timerFactory.Count);
Assert.Equal(DestinationHealth.Unknown, destination.Health.Passive);
Assert.Throws<ObjectDisposedException>(() => timerFactory.FireTimer(0));
timerFactory.FireAll();

Assert.Equal(DestinationHealth.Healthy, destination.Health.Passive);
timerFactory.AssertTimerDisposed(0);
}
}
}
Loading

0 comments on commit aa87462

Please sign in to comment.