Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix against TimeSpan.Zero for max lifetime #72

Merged
merged 9 commits into from
Sep 18, 2024
17 changes: 4 additions & 13 deletions examples/Elastic.Channels.Continuous/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Threading.Channels;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;

Expand All @@ -15,8 +16,7 @@
{
BufferOptions = new BufferOptions()
{
OutboundBufferMaxSize = 10_000,
InboundBufferMaxSize = 10_000_000,
OutboundBufferMaxLifetime = TimeSpan.Zero
},
ExportBufferCallback = () => Console.Write("."),
ExportExceptionCallback = e => Console.Write("!"),
Expand All @@ -28,15 +28,6 @@
await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) =>
{
var e = new NoopBufferedChannel.NoopEvent { Id = i };
var written = false;
//Console.Write('.');
var ready = await channel.WaitToWriteAsync(ctx);
if (ready) written = channel.TryWrite(e);
if (!written)
{
Console.WriteLine();
Console.WriteLine(channel);
Console.WriteLine(i);
Environment.Exit(1);
}
if (!await channel.WaitToWriteAsync(e))
Console.WriteLine(channel.OutstandingOperations);
});
11 changes: 10 additions & 1 deletion src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ public class BufferOptions
/// </summary>
public int OutboundBufferMaxSize { get; set; } = 1_000;

private TimeSpan _outboundBufferMaxLifetime = TimeSpan.FromSeconds(5);
private readonly TimeSpan _outboundBufferMinLifetime = TimeSpan.FromSeconds(1);


/// <summary>
/// The maximum lifetime of a buffer to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>.
/// If a buffer is older then the configured <see cref="OutboundBufferMaxLifetime"/> it will be flushed to
/// <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/> regardless of it's current size
/// <para>Defaults to <c>5 seconds</c></para>
/// <para>Any value less than <c>1 second</c> will be rounded back up to <c>1 second</c></para>
/// </summary>
public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5);
public TimeSpan OutboundBufferMaxLifetime
{
get => _outboundBufferMaxLifetime;
set => _outboundBufferMaxLifetime = value >= _outboundBufferMinLifetime ? value : _outboundBufferMaxLifetime;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensuring we can never set max life time to any thing less then 1s.

Otherwise each publish will cause an export, akin to setting OutboundBufferMaxSize to 1.

}

/// <summary>
/// The maximum number of consumers allowed to poll for new events on the channel.
Expand Down
22 changes: 19 additions & 3 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime)
private Channel<TEvent> InChannel { get; }
private BufferOptions BufferOptions => Options.BufferOptions;

private long _outstandingOperations = 0;
/// <summary>
///
/// </summary>
public long OutstandingOperations => _outstandingOperations;

internal InboundBuffer<TEvent> InboundBuffer { get; }

/// <inheritdoc cref="ChannelWriter{T}.WaitToWriteAsync"/>
Expand All @@ -175,6 +181,7 @@ public override bool TryWrite(TEvent item)
{
if (InChannel.Writer.TryWrite(item))
{
Interlocked.Increment(ref _outstandingOperations);
_callbacks.PublishToInboundChannelCallback?.Invoke();
return true;
}
Expand All @@ -199,26 +206,34 @@ public async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, Cancell
/// <inheritdoc cref="IBufferedChannel{TEvent}.TryWriteMany"/>
public bool TryWriteMany(IEnumerable<TEvent> events)
{
var written = true;
var allWritten = true;

foreach (var @event in events)
{
written = TryWrite(@event);
var written = TryWrite(@event);
if (!written) allWritten = written;
}

return written;
return allWritten;
}

/// <inheritdoc cref="ChannelWriter{T}.WaitToWriteAsync"/>
public virtual async Task<bool> WaitToWriteAsync(TEvent item, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;

// small interop to protect inbound task from cpu stealing
if (_outstandingOperations % BufferOptions.OutboundBufferMaxSize / 10 == 0)
Mpdreamz marked this conversation as resolved.
Show resolved Hide resolved
await Task.Delay(TimeSpan.FromMilliseconds(1), ctx).ConfigureAwait(false);

if (await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false) &&
InChannel.Writer.TryWrite(item))
{
Interlocked.Increment(ref _outstandingOperations);
_callbacks.PublishToInboundChannelCallback?.Invoke();
return true;
}

_callbacks.PublishToInboundChannelFailureCallback?.Invoke();
return false;
}
Expand Down Expand Up @@ -323,6 +338,7 @@ private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan max
while (InboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item))
{
InboundBuffer.Add(item);
Interlocked.Decrement(ref _outstandingOperations);

if (InboundBuffer.DurationSinceFirstWaitToRead >= maxInterval)
break;
Expand Down
Loading