Skip to content

Commit

Permalink
Initial support for forward propagation after a threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
Turnerj committed May 16, 2020
1 parent cc7cd0c commit 71f8b13
Show file tree
Hide file tree
Showing 24 changed files with 201 additions and 130 deletions.
4 changes: 2 additions & 2 deletions src/CacheTower.Extensions.Redis/RedisLockExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void Register(ICacheStack cacheStack)
RegisteredStack = cacheStack;
}

public async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<ValueTask<CacheEntry<T>>> valueProvider, CacheSettings settings)
public async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<ValueTask<CacheEntry<T>>> valueProvider, CacheEntryLifetime settings)
{
var hasLock = await Database.StringSetAsync(cacheKey, RedisValue.EmptyString, expiry: LockTimeout, when: When.NotExists);

Expand All @@ -82,7 +82,7 @@ public async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func
}
}

private async Task<CacheEntry<T>> WaitForResult<T>(string cacheKey, CacheSettings settings)
private async Task<CacheEntry<T>> WaitForResult<T>(string cacheKey, CacheEntryLifetime settings)
{
var delayedResultSource = new TaskCompletionSource<bool>();
var waitList = new[] { delayedResultSource };
Expand Down
14 changes: 12 additions & 2 deletions src/CacheTower/CacheEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@ namespace CacheTower
{
public abstract class CacheEntry
{
/// <summary>
/// The absolute expiry date of the <see cref="CacheEntry"/>.
/// </summary>
public DateTime Expiry { get; }
/// <summary>
/// The number of in-memory cache hits the <see cref="CacheEntry"/> has had.
/// </summary>
public int CacheHitCount => _CacheHitCount;

internal int _CacheHitCount;
internal bool _HasBeenForwardPropagated;

protected CacheEntry(DateTime expiry)
{
Expand All @@ -18,9 +28,9 @@ protected CacheEntry(DateTime expiry)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public DateTime GetStaleDate(CacheSettings cacheSettings)
public DateTime GetStaleDate(CacheEntryLifetime entryLifetime)
{
return Expiry - cacheSettings.TimeToLive + cacheSettings.StaleAfter;
return Expiry - entryLifetime.TimeToLive + entryLifetime.StaleAfter;
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/CacheTower/CacheEntryLifetime.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace CacheTower
{
public struct CacheEntryLifetime
{
public TimeSpan TimeToLive { get; }
public TimeSpan StaleAfter { get; }

public CacheEntryLifetime(TimeSpan timeToLive)
{
TimeToLive = timeToLive;
StaleAfter = TimeSpan.Zero;
}

public CacheEntryLifetime(TimeSpan timeToLive, TimeSpan staleAfter)
{
TimeToLive = timeToLive;
StaleAfter = staleAfter;
}
}
}
19 changes: 5 additions & 14 deletions src/CacheTower/CacheSettings.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using CacheTower.Providers.Memory;

namespace CacheTower
{
public struct CacheSettings
{
public TimeSpan TimeToLive { get; }
public TimeSpan StaleAfter { get; }

public CacheSettings(TimeSpan timeToLive)
{
TimeToLive = timeToLive;
StaleAfter = TimeSpan.Zero;
}

public CacheSettings(TimeSpan timeToLive, TimeSpan staleAfter)
{
TimeToLive = timeToLive;
StaleAfter = staleAfter;
}
/// <summary>
/// The number of cache hits before forward propagating from a <see cref="MemoryCacheLayer" /> to higher level caches.
/// </summary>
public uint ForwardPropagateAfterXCacheHits { get; set; }
}
}
159 changes: 102 additions & 57 deletions src/CacheTower/CacheStack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using CacheTower.Extensions;
using CacheTower.Providers.Memory;
using Microsoft.Extensions.DependencyInjection;

namespace CacheTower
Expand Down Expand Up @@ -89,17 +90,17 @@ public async ValueTask EvictAsync(string cacheKey)
}
}

public async ValueTask<CacheEntry<T>> SetAsync<T>(string cacheKey, T value, TimeSpan timeToLive)
public async ValueTask<CacheEntry<T>> SetAsync<T>(string cacheKey, T value, TimeSpan timeToLive, CacheSettings settings = default)
{
ThrowIfDisposed();

var expiry = DateTime.UtcNow + timeToLive;
var cacheEntry = new CacheEntry<T>(value, expiry);
await SetAsync(cacheKey, cacheEntry);
await SetAsync(cacheKey, cacheEntry, settings);
return cacheEntry;
}

public async ValueTask SetAsync<T>(string cacheKey, CacheEntry<T> cacheEntry)
public async ValueTask SetAsync<T>(string cacheKey, CacheEntry<T> cacheEntry, CacheSettings settings = default)
{
ThrowIfDisposed();

Expand All @@ -113,16 +114,23 @@ public async ValueTask SetAsync<T>(string cacheKey, CacheEntry<T> cacheEntry)
throw new ArgumentNullException(nameof(cacheEntry));
}

for (int i = 0, l = CacheLayers.Length; i < l; i++)
if (settings.ForwardPropagateAfterXCacheHits > 0 && CacheLayers[0] is MemoryCacheLayer memoryCacheLayer)
{
var layer = CacheLayers[i];
if (layer is ISyncCacheLayer syncLayerOne)
{
syncLayerOne.Set(cacheKey, cacheEntry);
}
else
memoryCacheLayer.Set(cacheKey, cacheEntry);
}
else
{
for (int i = 0, l = CacheLayers.Length; i < l; i++)
{
await (layer as IAsyncCacheLayer).SetAsync(cacheKey, cacheEntry);
var layer = CacheLayers[i];
if (layer is ISyncCacheLayer syncLayerOne)
{
syncLayerOne.Set(cacheKey, cacheEntry);
}
else
{
await (layer as IAsyncCacheLayer).SetAsync(cacheKey, cacheEntry);
}
}
}
}
Expand Down Expand Up @@ -201,7 +209,7 @@ public async ValueTask<CacheEntry<T>> GetAsync<T>(string cacheKey)
return default;
}

public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheSettings settings)
public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheEntryLifetime entryLifetime, CacheSettings settings = default)
{
ThrowIfDisposed();

Expand All @@ -220,13 +228,14 @@ public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> get
{
var cacheEntry = cacheEntryPoint.CacheEntry;
var currentTime = DateTime.UtcNow;
if (cacheEntry.GetStaleDate(settings) < currentTime)
var isStale = cacheEntry.GetStaleDate(entryLifetime) < currentTime;
if (isStale)
{
if (cacheEntry.Expiry < currentTime)
{
//Refresh the value in the current thread though short circuit if we're unable to establish a lock
//If the lock isn't established, it will instead use the stale cache entry (even if past the allowed stale period)
var refreshedCacheEntry = await RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: false);
var refreshedCacheEntry = await RefreshValueAsync(cacheKey, getter, entryLifetime, settings, waitForRefresh: false);
if (refreshedCacheEntry != default)
{
cacheEntry = refreshedCacheEntry;
Expand All @@ -235,62 +244,56 @@ public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> get
else
{
//Refresh the value in the background
_ = RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: false);
_ = RefreshValueAsync(cacheKey, getter, entryLifetime, settings, waitForRefresh: false);
}
}
else if (cacheEntryPoint.LayerIndex > 0)
else
{
//If a lower-level cache is missing the latest data, attempt to set it in the background
_ = BackPopulateCacheAsync(cacheEntryPoint.LayerIndex, cacheKey, cacheEntry);
if (cacheEntryPoint.LayerIndex > 0)
{
//If a lower-level cache (eg. a memory cache) is missing the latest data, attempt to set it in the background
_ = BackPropagateCacheEntryAsync(cacheEntryPoint.LayerIndex, cacheKey, cacheEntry);
}
else if (!cacheEntry._HasBeenForwardPropagated && settings.ForwardPropagateAfterXCacheHits > 0 && cacheEntry.CacheHitCount >= settings.ForwardPropagateAfterXCacheHits)
{
//If enabled, we push the local cache entry to higher-level caches, doing so in the background
_ = ForwardPropagateCacheEntryAsync(cacheEntryPoint.LayerIndex + 1, cacheKey, cacheEntry);
}
}

return cacheEntry.Value;
}
else
{
//Refresh the value in the current thread though because we have no old cache value, we have to lock and wait
return (await RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: true)).Value;
return (await RefreshValueAsync(cacheKey, getter, entryLifetime, settings, waitForRefresh: true)).Value;
}
}

private async ValueTask BackPopulateCacheAsync<T>(int fromIndexExclusive, string cacheKey, CacheEntry<T> cacheEntry)
private async ValueTask BackPropagateCacheEntryAsync<T>(int fromIndexExclusive, string cacheKey, CacheEntry<T> cacheEntry)
{
ThrowIfDisposed();

var hasLock = false;
lock (WaitingKeyRefresh)
{
#if NETSTANDARD2_0
hasLock = !WaitingKeyRefresh.ContainsKey(cacheKey);
if (hasLock)
{
WaitingKeyRefresh[cacheKey] = Array.Empty<TaskCompletionSource<object>>();
}
#elif NETSTANDARD2_1
hasLock = WaitingKeyRefresh.TryAdd(cacheKey, Array.Empty<TaskCompletionSource<object>>());
#endif
}

if (hasLock)
if (TryGetKeyRefreshLock(cacheKey))
{
try
{
for (; --fromIndexExclusive >= 0;)
{
var previousLayer = CacheLayers[fromIndexExclusive];
if (previousLayer is ISyncCacheLayer prevSyncLayer)
var cacheLayer = CacheLayers[fromIndexExclusive];
if (cacheLayer is ISyncCacheLayer syncLayer)
{
if (prevSyncLayer.IsAvailable(cacheKey))
if (syncLayer.IsAvailable(cacheKey))
{
prevSyncLayer.Set(cacheKey, cacheEntry);
syncLayer.Set(cacheKey, cacheEntry);
}
}
else
{
var prevAsyncLayer = previousLayer as IAsyncCacheLayer;
if (await prevAsyncLayer.IsAvailableAsync(cacheKey))
var asyncCacheLayer = cacheLayer as IAsyncCacheLayer;
if (await asyncCacheLayer.IsAvailableAsync(cacheKey))
{
await prevAsyncLayer.SetAsync(cacheKey, cacheEntry);
await asyncCacheLayer.SetAsync(cacheKey, cacheEntry);
}
}
}
Expand All @@ -302,25 +305,48 @@ private async ValueTask BackPopulateCacheAsync<T>(int fromIndexExclusive, string
}
}

private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheSettings settings, bool waitForRefresh)
private async ValueTask ForwardPropagateCacheEntryAsync<T>(int fromIndexExclusive, string cacheKey, CacheEntry<T> cacheEntry)
{
ThrowIfDisposed();

var hasLock = false;
lock (WaitingKeyRefresh)
if (TryGetKeyRefreshLock(cacheKey) && cacheEntry._HasBeenForwardPropagated)
{
#if NETSTANDARD2_0
hasLock = !WaitingKeyRefresh.ContainsKey(cacheKey);
if (hasLock)
try
{
WaitingKeyRefresh[cacheKey] = Array.Empty<TaskCompletionSource<object>>();
for (; ++fromIndexExclusive < CacheLayers.Length;)
{
var cacheLayer = CacheLayers[fromIndexExclusive];
if (cacheLayer is ISyncCacheLayer syncLayer)
{
if (syncLayer.IsAvailable(cacheKey))
{
syncLayer.Set(cacheKey, cacheEntry);
}
}
else
{
var asyncCacheLayer = cacheLayer as IAsyncCacheLayer;
if (await asyncCacheLayer.IsAvailableAsync(cacheKey))
{
await asyncCacheLayer.SetAsync(cacheKey, cacheEntry);
}
}
}

cacheEntry._HasBeenForwardPropagated = true;
}
finally
{
UnlockWaitingTasks(cacheKey, cacheEntry);
}
#elif NETSTANDARD2_1
hasLock = WaitingKeyRefresh.TryAdd(cacheKey, Array.Empty<TaskCompletionSource<object>>());
#endif
}
}

if (hasLock)
private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheEntryLifetime entryLifetime, CacheSettings settings, bool waitForRefresh)
{
ThrowIfDisposed();

if (TryGetKeyRefreshLock(cacheKey))
{
try
{
Expand All @@ -335,14 +361,14 @@ private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Fun
}

var value = await getter(oldValue);
var refreshedEntry = await SetAsync(cacheKey, value, settings.TimeToLive);
var refreshedEntry = await SetAsync(cacheKey, value, entryLifetime.TimeToLive, settings);

_ = Extensions.OnValueRefreshAsync(cacheKey, settings.TimeToLive);
_ = Extensions.OnValueRefreshAsync(cacheKey, entryLifetime.TimeToLive);

UnlockWaitingTasks(cacheKey, refreshedEntry);

return refreshedEntry;
}, settings);
}, entryLifetime);
}
catch
{
Expand All @@ -369,7 +395,7 @@ private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Fun

//Last minute check to confirm whether waiting is required
var currentEntry = await GetAsync<T>(cacheKey);
if (currentEntry != null && currentEntry.GetStaleDate(settings) > DateTime.UtcNow)
if (currentEntry != null && currentEntry.GetStaleDate(entryLifetime) > DateTime.UtcNow)
{
UnlockWaitingTasks(cacheKey, currentEntry);
return currentEntry;
Expand Down Expand Up @@ -400,6 +426,25 @@ private void UnlockWaitingTasks(string cacheKey, CacheEntry cacheEntry)
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool TryGetKeyRefreshLock(string cacheKey)
{
var hasLock = false;
lock (WaitingKeyRefresh)
{
#if NETSTANDARD2_0
hasLock = !WaitingKeyRefresh.ContainsKey(cacheKey);
if (hasLock)
{
WaitingKeyRefresh[cacheKey] = Array.Empty<TaskCompletionSource<object>>();
}
#elif NETSTANDARD2_1
hasLock = WaitingKeyRefresh.TryAdd(cacheKey, Array.Empty<TaskCompletionSource<object>>());
#endif
}
return hasLock;
}

#if NETSTANDARD2_0
public void Dispose()
{
Expand Down
Loading

0 comments on commit 71f8b13

Please sign in to comment.