From d15befb589866d0443f7f2b3e4bf6e5e462061fc Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 17 Sep 2024 11:14:14 +0200 Subject: [PATCH 1/9] Fix against TimeSpan.Zero for max lifetime --- .../Elastic.Channels.Continuous/Program.cs | 17 ++++---------- src/Elastic.Channels/BufferOptions.cs | 11 +++++++++- src/Elastic.Channels/BufferedChannelBase.cs | 22 ++++++++++++++++--- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/examples/Elastic.Channels.Continuous/Program.cs b/examples/Elastic.Channels.Continuous/Program.cs index 7aa6b89..30e2abe 100644 --- a/examples/Elastic.Channels.Continuous/Program.cs +++ b/examples/Elastic.Channels.Continuous/Program.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Threading.Channels; using Elastic.Channels; using Elastic.Channels.Diagnostics; @@ -15,8 +16,7 @@ { BufferOptions = new BufferOptions() { - OutboundBufferMaxSize = 10_000, - InboundBufferMaxSize = 10_000_000, + OutboundBufferMaxLifetime = TimeSpan.Zero }, ExportBufferCallback = () => Console.Write("."), ExportExceptionCallback = e => Console.Write("!"), @@ -28,15 +28,6 @@ await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) => { var e = new NoopBufferedChannel.NoopEvent { Id = i }; - var written = false; - //Console.Write('.'); - var ready = await channel.WaitToWriteAsync(ctx); - if (ready) written = channel.TryWrite(e); - if (!written) - { - Console.WriteLine(); - Console.WriteLine(channel); - Console.WriteLine(i); - Environment.Exit(1); - } + if (!await channel.WaitToWriteAsync(e)) + Console.WriteLine(channel.OutstandingOperations); }); diff --git a/src/Elastic.Channels/BufferOptions.cs b/src/Elastic.Channels/BufferOptions.cs index 2477558..3e4d3aa 100644 --- a/src/Elastic.Channels/BufferOptions.cs +++ b/src/Elastic.Channels/BufferOptions.cs @@ -25,13 +25,22 @@ public class BufferOptions /// public int OutboundBufferMaxSize { get; set; } = 1_000; + private TimeSpan _outboundBufferMaxLifetime = TimeSpan.FromSeconds(5); + private readonly TimeSpan _outboundBufferMinLifetime = TimeSpan.FromSeconds(1); + + /// /// The maximum lifetime of a buffer to export to . /// If a buffer is older then the configured it will be flushed to /// regardless of it's current size /// Defaults to 5 seconds + /// Any value less than 1 second will be rounded back up to 1 second /// - public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5); + public TimeSpan OutboundBufferMaxLifetime + { + get => _outboundBufferMaxLifetime; + set => _outboundBufferMaxLifetime = value >= _outboundBufferMinLifetime ? value : _outboundBufferMaxLifetime; + } /// /// The maximum number of consumers allowed to poll for new events on the channel. diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index fac03e5..6bb18e3 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -162,6 +162,12 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime) private Channel InChannel { get; } private BufferOptions BufferOptions => Options.BufferOptions; + private long _outstandingOperations = 0; + /// + /// + /// + public long OutstandingOperations => _outstandingOperations; + internal InboundBuffer InboundBuffer { get; } /// @@ -175,6 +181,7 @@ public override bool TryWrite(TEvent item) { if (InChannel.Writer.TryWrite(item)) { + Interlocked.Increment(ref _outstandingOperations); _callbacks.PublishToInboundChannelCallback?.Invoke(); return true; } @@ -199,26 +206,34 @@ public async Task WaitToWriteManyAsync(IEnumerable events, Cancell /// public bool TryWriteMany(IEnumerable events) { - var written = true; + var allWritten = true; foreach (var @event in events) { - written = TryWrite(@event); + var written = TryWrite(@event); + if (!written) allWritten = written; } - return written; + return allWritten; } /// public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken ctx = default) { ctx = ctx == default ? TokenSource.Token : ctx; + + // small interop to protect inbound task from cpu stealing + if (_outstandingOperations % BufferOptions.OutboundBufferMaxSize / 10 == 0) + await Task.Delay(TimeSpan.FromMilliseconds(1), ctx).ConfigureAwait(false); + if (await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false) && InChannel.Writer.TryWrite(item)) { + Interlocked.Increment(ref _outstandingOperations); _callbacks.PublishToInboundChannelCallback?.Invoke(); return true; } + _callbacks.PublishToInboundChannelFailureCallback?.Invoke(); return false; } @@ -323,6 +338,7 @@ private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan max while (InboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item)) { InboundBuffer.Add(item); + Interlocked.Decrement(ref _outstandingOperations); if (InboundBuffer.DurationSinceFirstWaitToRead >= maxInterval) break; From e011dcfb3f89f681e526293e0ad46511e6c5a780 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 17 Sep 2024 13:22:05 +0200 Subject: [PATCH 2/9] Move blocking logic to WaitToWrriteAsync and use a more reliable implementation to ensure some drainage happened --- .../Elastic.Channels.Continuous/Program.cs | 4 +- src/Elastic.Channels/BufferedChannelBase.cs | 49 +++++++++++++------ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/examples/Elastic.Channels.Continuous/Program.cs b/examples/Elastic.Channels.Continuous/Program.cs index 30e2abe..adbf6fe 100644 --- a/examples/Elastic.Channels.Continuous/Program.cs +++ b/examples/Elastic.Channels.Continuous/Program.cs @@ -14,7 +14,7 @@ var options = new NoopBufferedChannel.NoopChannelOptions { - BufferOptions = new BufferOptions() + BufferOptions = new BufferOptions { OutboundBufferMaxLifetime = TimeSpan.Zero }, @@ -29,5 +29,5 @@ { var e = new NoopBufferedChannel.NoopEvent { Id = i }; if (!await channel.WaitToWriteAsync(e)) - Console.WriteLine(channel.OutstandingOperations); + Console.WriteLine($" {channel.MaxConcurrency} {channel.OngoingExportOperations} -> {channel.OutstandingOperations}"); }); diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 6bb18e3..00e8d60 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -61,7 +61,8 @@ public abstract class BufferedChannelBase private readonly Task _inTask; private readonly Task _outTask; private readonly int _maxConcurrency; - private readonly SemaphoreSlim _throttleTasks; + private readonly SemaphoreSlim _throttleExportTasks; + private readonly SemaphoreSlim _throttleOutboundPublishes; private readonly CountdownEvent? _signal; private readonly ChannelCallbackInvoker _callbacks; @@ -102,7 +103,8 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(new BoundedChannelOptions(maxIn) { @@ -162,16 +164,38 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime) private Channel InChannel { get; } private BufferOptions BufferOptions => Options.BufferOptions; - private long _outstandingOperations = 0; + private long _outstandingOperations; + /// /// /// public long OutstandingOperations => _outstandingOperations; + /// + /// The effective concurrency. + /// Either the configured concurrency + /// or the calculated concurrency. + /// + public int MaxConcurrency => _maxConcurrency; + + /// Outstanding export operations + public int OngoingExportOperations => _throttleExportTasks.CurrentCount; + internal InboundBuffer InboundBuffer { get; } /// - public override ValueTask WaitToWriteAsync(CancellationToken ctx = default) => InChannel.Writer.WaitToWriteAsync(ctx); + public override async ValueTask WaitToWriteAsync(CancellationToken ctx = default) + { + //provides an interop allowing sufficient drain of export tasks in cases where + //data is produced in a tight loop and might otherwise thread steal from + //export tasks + if (BufferOptions.BoundedChannelFullMode == BoundedChannelFullMode.Wait + && OngoingExportOperations >= MaxConcurrency) + await _throttleOutboundPublishes.WaitAsync(TokenSource.Token).ConfigureAwait(false); + //_throttleExportTasks.AvailableWaitHandle.WaitOne(TimeSpan.FromSeconds(1)); + + return await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false); + } /// public override bool TryComplete(Exception? error = null) => InChannel.Writer.TryComplete(error); @@ -222,12 +246,7 @@ public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken { ctx = ctx == default ? TokenSource.Token : ctx; - // small interop to protect inbound task from cpu stealing - if (_outstandingOperations % BufferOptions.OutboundBufferMaxSize / 10 == 0) - await Task.Delay(TimeSpan.FromMilliseconds(1), ctx).ConfigureAwait(false); - - if (await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false) && - InChannel.Writer.TryWrite(item)) + if (await WaitToWriteAsync(ctx).ConfigureAwait(false) && InChannel.Writer.TryWrite(item)) { Interlocked.Increment(ref _outstandingOperations); _callbacks.PublishToInboundChannelCallback?.Invoke(); @@ -254,7 +273,6 @@ private async Task ConsumeOutboundEventsAsync() var taskList = new List(_maxConcurrency); while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) - // ReSharper disable once RemoveRedundantBraces { if (TokenSource.Token.IsCancellationRequested) break; if (_signal is { IsSet: true }) break; @@ -262,7 +280,7 @@ private async Task ConsumeOutboundEventsAsync() while (OutChannel.Reader.TryRead(out var buffer)) { var items = buffer.GetArraySegment(); - await _throttleTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false); + await _throttleExportTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false); var t = ExportBufferAsync(items, buffer); taskList.Add(t); @@ -271,7 +289,7 @@ private async Task ConsumeOutboundEventsAsync() var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false); taskList.Remove(completedTask); } - _throttleTasks.Release(); + _throttleExportTasks.Release(); } } await Task.WhenAll(taskList).ConfigureAwait(false); @@ -344,6 +362,8 @@ private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan max break; } + _throttleOutboundPublishes.Release(); + if (InboundBuffer.ThresholdsHit) await FlushBufferAsync().ConfigureAwait(false); } @@ -373,10 +393,9 @@ async Task AsyncSlowPath(IOutboundBuffer b) var maxRetries = Options.BufferOptions.ExportMaxRetries; for (var i = 0; i <= maxRetries; i++) while (await OutChannel.Writer.WaitToWriteAsync().ConfigureAwait(false)) - { if (OutChannel.Writer.TryWrite(b)) return true; - } + return false; } From 2f6828c76d951e6828829e38cda00e995b229644 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 17 Sep 2024 13:23:20 +0200 Subject: [PATCH 3/9] remove dead code --- src/Elastic.Channels/BufferedChannelBase.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 00e8d60..ce23d07 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -192,7 +192,6 @@ public override async ValueTask WaitToWriteAsync(CancellationToken ctx = d if (BufferOptions.BoundedChannelFullMode == BoundedChannelFullMode.Wait && OngoingExportOperations >= MaxConcurrency) await _throttleOutboundPublishes.WaitAsync(TokenSource.Token).ConfigureAwait(false); - //_throttleExportTasks.AvailableWaitHandle.WaitOne(TimeSpan.FromSeconds(1)); return await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false); } From 1b2f47e9bafeac5a0d3b639400a90632b831df60 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 17 Sep 2024 13:41:09 +0200 Subject: [PATCH 4/9] ensure noop export waits signicicantly long to force concurrency --- .../Diagnostics/NoopBufferedChannel.cs | 2 +- tests/Elastic.Channels.Tests/BehaviorTests.cs | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs index fe2dbc4..4c461bf 100644 --- a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs @@ -67,7 +67,7 @@ protected override async Task ExportAsync(ArraySegment if (!Options.TrackConcurrency) return new NoopResponse(); var max = Interlocked.Increment(ref _currentMax); - await Task.Delay(TimeSpan.FromMilliseconds(1), ctx).ConfigureAwait(false); + await Task.Delay(TimeSpan.FromMilliseconds(100), ctx).ConfigureAwait(false); Interlocked.Decrement(ref _currentMax); if (max > ObservedConcurrency) ObservedConcurrency = max; return new NoopResponse(); diff --git a/tests/Elastic.Channels.Tests/BehaviorTests.cs b/tests/Elastic.Channels.Tests/BehaviorTests.cs index 5af17e4..9e58183 100644 --- a/tests/Elastic.Channels.Tests/BehaviorTests.cs +++ b/tests/Elastic.Channels.Tests/BehaviorTests.cs @@ -15,7 +15,13 @@ namespace Elastic.Channels.Tests; public class BehaviorTests : IDisposable { - public BehaviorTests(ITestOutputHelper testOutput) => XunitContext.Register(testOutput); + private readonly ITestOutputHelper _testOutput; + + public BehaviorTests(ITestOutputHelper testOutput) + { + _testOutput = testOutput; + XunitContext.Register(testOutput); + } void IDisposable.Dispose() => XunitContext.Flush(); @@ -74,7 +80,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers() [Fact] public async Task ConcurrencyIsApplied() { - int totalEvents = 5_000, maxInFlight = 5_000, bufferSize = 500; + int totalEvents = 50_000, maxInFlight = 50_000, bufferSize = 5000; var expectedPages = totalEvents / bufferSize; var bufferOptions = new BufferOptions { @@ -85,7 +91,9 @@ [Fact] public async Task ConcurrencyIsApplied() }; var channel = new NoopBufferedChannel(bufferOptions, observeConcurrency: true); + channel.MaxConcurrency.Should().BeGreaterThan(1); + _testOutput.WriteLine($"{channel.MaxConcurrency}"); var written = 0; for (var i = 0; i < totalEvents; i++) { From 183d2e03194b2ac45608867d9bf7aef1886d64d1 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 17 Sep 2024 21:39:00 +0200 Subject: [PATCH 5/9] model draining before accepting new writes more explicitly --- .../Benchmarks/BulkIngestionBenchmarks.cs | 2 +- .../Program.cs | 23 +- .../Elastic.Channels.Continuous/Program.cs | 26 ++- src/Elastic.Channels/BufferedChannelBase.cs | 198 ++++++++++-------- .../Diagnostics/DiagnosticsBufferedChannel.cs | 9 +- tests/Elastic.Channels.Tests/BehaviorTests.cs | 4 +- .../CalculatedPropertyTests.cs | 56 +++++ 7 files changed, 193 insertions(+), 125 deletions(-) create mode 100644 tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs index 52fa94c..81360ba 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs @@ -70,6 +70,6 @@ public void BulkAll() channel.TryWriteMany(_data); channel.TryComplete(); - _waitHandle.WaitOne(); + // _waitHandle.WaitOne(); } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs index 42ce2e1..c347608 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs @@ -14,29 +14,8 @@ using System.Globalization; using Elastic.Ingest.Elasticsearch.Benchmarks.Benchmarks; -#if DEBUG -// MANUALLY RUN A BENCHMARKED METHOD DURING DEBUGGING - -//var bm = new BulkIngestion(); -//bm.Setup(); -//await bm.BulkAllAsync(); -//Console.WriteLine("DONE"); - -var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks(); -bm.Setup(); -await bm.WriteToStreamAsync(); - -var length = bm.MemoryStream.Length; - -bm.MemoryStream.Position = 0; -var sr = new StreamReader(bm.MemoryStream); -var json = sr.ReadToEnd(); - -Console.ReadKey(); -#else var config = ManualConfig.Create(DefaultConfig.Instance); config.SummaryStyle = new SummaryStyle(CultureInfo.CurrentCulture, true, BenchmarkDotNet.Columns.SizeUnit.B, null!, ratioStyle: BenchmarkDotNet.Columns.RatioStyle.Percentage); config.AddDiagnoser(MemoryDiagnoser.Default); -BenchmarkRunner.Run(config); -#endif +BenchmarkRunner.Run(config); diff --git a/examples/Elastic.Channels.Continuous/Program.cs b/examples/Elastic.Channels.Continuous/Program.cs index adbf6fe..9f9698b 100644 --- a/examples/Elastic.Channels.Continuous/Program.cs +++ b/examples/Elastic.Channels.Continuous/Program.cs @@ -1,5 +1,4 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information using System.Threading.Channels; @@ -14,20 +13,31 @@ var options = new NoopBufferedChannel.NoopChannelOptions { + //TrackConcurrency = true, BufferOptions = new BufferOptions { - OutboundBufferMaxLifetime = TimeSpan.Zero + OutboundBufferMaxLifetime = TimeSpan.Zero, + InboundBufferMaxSize = 1_000_000, + OutboundBufferMaxSize = 1_000_000 }, ExportBufferCallback = () => Console.Write("."), - ExportExceptionCallback = e => Console.Write("!"), - PublishToInboundChannelFailureCallback = () => Console.Write("I"), - PublishToOutboundChannelFailureCallback = () => Console.Write("O"), + ExportExceptionCallback = e => Console.Write("!") }; +Console.WriteLine("2"); var channel = new DiagnosticsBufferedChannel(options); +Console.WriteLine($"Begin: ({channel.OutboundStarted}) {channel.MaxConcurrency} {channel.BatchExportOperations} -> {channel.InflightEvents}"); await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) => { var e = new NoopBufferedChannel.NoopEvent { Id = i }; - if (!await channel.WaitToWriteAsync(e)) - Console.WriteLine($" {channel.MaxConcurrency} {channel.OngoingExportOperations} -> {channel.OutstandingOperations}"); + if (await channel.WaitToWriteAsync(e)) + { + + } + + if (i % 10_000 == 0) + { + Console.Clear(); + Console.WriteLine(channel); + } }); diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index ce23d07..199f6ee 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -60,9 +61,7 @@ public abstract class BufferedChannelBase { private readonly Task _inTask; private readonly Task _outTask; - private readonly int _maxConcurrency; private readonly SemaphoreSlim _throttleExportTasks; - private readonly SemaphoreSlim _throttleOutboundPublishes; private readonly CountdownEvent? _signal; private readonly ChannelCallbackInvoker _callbacks; @@ -70,6 +69,58 @@ public abstract class BufferedChannelBase /// public IChannelDiagnosticsListener? DiagnosticsListener { get; } + /// The channel options currently in use + public TChannelOptions Options { get; } + + /// An overall cancellation token that may be externally provided + protected CancellationTokenSource TokenSource { get; } + + /// Internal cancellation token for signalling that all publishing activity has completed. + private readonly CancellationTokenSource _exitCancelSource = new(); + + private Channel> OutChannel { get; } + private Channel InChannel { get; } + private BufferOptions BufferOptions => Options.BufferOptions; + + private long _inflightEvents; + /// The number of inflight events + public long InflightEvents => _inflightEvents; + + /// Current number of tasks handling exporting the events + public int ExportTasks => _taskList.Count; + + /// + /// The effective concurrency. + /// Either the configured concurrency or the calculated concurrency. + /// + public int MaxConcurrency { get; } + + /// + /// The effective batch export size . + /// Either the configured concurrency or the calculated size. + /// If the configured exceeds ( / ) + /// the batch export size will be lowered to ( / ) to ensure we saturate + /// + public int BatchExportSize { get; } + + /// + /// If is set to + /// and approaches + /// will block until drops with atleast this size + /// + public int DrainSize { get; } + + private int _ongoingExportOperations; + /// Outstanding export operations + public int BatchExportOperations => _ongoingExportOperations; + private readonly CountdownEvent _waitForOutboundRead; + private List _taskList; + + /// + public bool OutboundStarted => _waitForOutboundRead.IsSet; + + internal InboundBuffer InboundBuffer { get; } + /// protected BufferedChannelBase(TChannelOptions options) : this(options, null) { } @@ -94,31 +145,23 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(listeners); - var maxIn = Math.Max(1, BufferOptions.InboundBufferMaxSize); - // The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize - var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize)); - var defaultMaxConcurrency = (int)Math.Ceiling(maxIn / (double)maxOut); - _maxConcurrency = - BufferOptions.ExportMaxConcurrency.HasValue - ? BufferOptions.ExportMaxConcurrency.Value - : Math.Min(defaultMaxConcurrency, Environment.ProcessorCount * 2); - - _throttleOutboundPublishes = new SemaphoreSlim(_maxConcurrency, _maxConcurrency); - _throttleExportTasks = new SemaphoreSlim(_maxConcurrency, _maxConcurrency); + var maxIn = Math.Max(Math.Max(1, BufferOptions.InboundBufferMaxSize), BufferOptions.OutboundBufferMaxSize); + var defaultMaxOut = Math.Max(1, BufferOptions.OutboundBufferMaxSize); + var calculatedConcurrency = (int)Math.Ceiling(maxIn / (double)defaultMaxOut); + var defaultConcurrency = Environment.ProcessorCount * 2; + MaxConcurrency = BufferOptions.ExportMaxConcurrency ?? Math.Min(calculatedConcurrency, defaultConcurrency); + + // The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize / (MaxConcurrency * 2) + BatchExportSize = Math.Min(BufferOptions.InboundBufferMaxSize / (MaxConcurrency), Math.Max(1, BufferOptions.OutboundBufferMaxSize)); + DrainSize = Math.Min(100_000, Math.Min(BatchExportSize * 2, maxIn / 2)); + + _taskList = new List(MaxConcurrency * 2); + + _throttleExportTasks = new SemaphoreSlim(MaxConcurrency, MaxConcurrency); _signal = options.BufferOptions.WaitHandle; - InChannel = Channel.CreateBounded(new BoundedChannelOptions(maxIn) - { - SingleReader = false, - SingleWriter = false, - // Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727 - // AFAICT this is fine since we run in a dedicated long running task. - AllowSynchronousContinuations = true, - // wait does not block it simply signals that Writer.TryWrite should return false and be retried - // DropWrite will make `TryWrite` always return true, which is not what we want. - FullMode = options.BufferOptions.BoundedChannelFullMode - }); + _waitForOutboundRead = new CountdownEvent(1); OutChannel = Channel.CreateBounded>( - new BoundedChannelOptions(_maxConcurrency * 2) + new BoundedChannelOptions(MaxConcurrency * 4) { SingleReader = false, SingleWriter = true, @@ -129,8 +172,19 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(new BoundedChannelOptions(maxIn) + { + SingleReader = false, + SingleWriter = false, + // Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727 + // AFAICT this is fine since we run in a dedicated long running task. + AllowSynchronousContinuations = true, + // wait does not block it simply signals that Writer.TryWrite should return false and be retried + // DropWrite will make `TryWrite` always return true, which is not what we want. + FullMode = options.BufferOptions.BoundedChannelFullMode + }); - InboundBuffer = new InboundBuffer(maxOut, BufferOptions.OutboundBufferMaxLifetime); + InboundBuffer = new InboundBuffer(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime); _outTask = Task.Factory.StartNew(async () => await ConsumeOutboundEventsAsync().ConfigureAwait(false), @@ -139,7 +193,7 @@ await ConsumeOutboundEventsAsync().ConfigureAwait(false), TaskScheduler.Default); _inTask = Task.Factory.StartNew(async () => - await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false), + await ConsumeInboundEventsAsync(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false), CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, TaskScheduler.Default); @@ -151,48 +205,12 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime) /// protected abstract Task ExportAsync(ArraySegment buffer, CancellationToken ctx = default); - /// The channel options currently in use - public TChannelOptions Options { get; } - - /// An overall cancellation token that may be externally provided - protected CancellationTokenSource TokenSource { get; } - - /// Internal cancellation token for signalling that all publishing activity has completed. - private readonly CancellationTokenSource _exitCancelSource = new CancellationTokenSource(); - - private Channel> OutChannel { get; } - private Channel InChannel { get; } - private BufferOptions BufferOptions => Options.BufferOptions; - - private long _outstandingOperations; - - /// - /// - /// - public long OutstandingOperations => _outstandingOperations; - - /// - /// The effective concurrency. - /// Either the configured concurrency - /// or the calculated concurrency. - /// - public int MaxConcurrency => _maxConcurrency; - - /// Outstanding export operations - public int OngoingExportOperations => _throttleExportTasks.CurrentCount; - - internal InboundBuffer InboundBuffer { get; } - /// public override async ValueTask WaitToWriteAsync(CancellationToken ctx = default) { - //provides an interop allowing sufficient drain of export tasks in cases where - //data is produced in a tight loop and might otherwise thread steal from - //export tasks - if (BufferOptions.BoundedChannelFullMode == BoundedChannelFullMode.Wait - && OngoingExportOperations >= MaxConcurrency) - await _throttleOutboundPublishes.WaitAsync(TokenSource.Token).ConfigureAwait(false); - + if (BufferOptions.BoundedChannelFullMode == BoundedChannelFullMode.Wait && _inflightEvents >= BufferOptions.InboundBufferMaxSize - DrainSize) + while (_inflightEvents >= (BufferOptions.InboundBufferMaxSize - DrainSize)) + await Task.Delay(TimeSpan.FromMilliseconds(100), ctx).ConfigureAwait(false); return await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false); } @@ -204,7 +222,7 @@ public override bool TryWrite(TEvent item) { if (InChannel.Writer.TryWrite(item)) { - Interlocked.Increment(ref _outstandingOperations); + Interlocked.Increment(ref _inflightEvents); _callbacks.PublishToInboundChannelCallback?.Invoke(); return true; } @@ -247,7 +265,7 @@ public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken if (await WaitToWriteAsync(ctx).ConfigureAwait(false) && InChannel.Writer.TryWrite(item)) { - Interlocked.Increment(ref _outstandingOperations); + Interlocked.Increment(ref _inflightEvents); _callbacks.PublishToInboundChannelCallback?.Invoke(); return true; } @@ -260,19 +278,19 @@ public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken /// Subclasses may override this to yield items from that can be retried. /// The default implementation of this simply always returns an empty collection /// - protected virtual ArraySegment RetryBuffer(TResponse response, - ArraySegment currentBuffer, - IWriteTrackingBuffer statistics - ) => EmptyArraySegments.Empty; + protected virtual ArraySegment RetryBuffer(TResponse response, ArraySegment currentBuffer, IWriteTrackingBuffer statistics) => + EmptyArraySegments.Empty; private async Task ConsumeOutboundEventsAsync() { _callbacks.OutboundChannelStartedCallback?.Invoke(); - var taskList = new List(_maxConcurrency); + _taskList = new List(MaxConcurrency * 2); while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) { + if (_waitForOutboundRead is { IsSet: false }) + _waitForOutboundRead.Signal(); if (TokenSource.Token.IsCancellationRequested) break; if (_signal is { IsSet: true }) break; @@ -281,23 +299,24 @@ private async Task ConsumeOutboundEventsAsync() var items = buffer.GetArraySegment(); await _throttleExportTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false); var t = ExportBufferAsync(items, buffer); - taskList.Add(t); + _taskList.Add(t); - if (taskList.Count >= _maxConcurrency) + if (_taskList.Count >= MaxConcurrency) { - var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false); - taskList.Remove(completedTask); + var completedTask = await Task.WhenAny(_taskList).ConfigureAwait(false); + _taskList.Remove(completedTask); } _throttleExportTasks.Release(); } } - await Task.WhenAll(taskList).ConfigureAwait(false); + await Task.WhenAll(_taskList).ConfigureAwait(false); _exitCancelSource.Cancel(); _callbacks.OutboundChannelExitedCallback?.Invoke(); } private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer buffer) { + Interlocked.Increment(ref _ongoingExportOperations); using var outboundBuffer = buffer; var maxRetries = Options.BufferOptions.ExportMaxRetries; for (var i = 0; i <= maxRetries && items.Count > 0; i++) @@ -334,10 +353,11 @@ private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false); _callbacks.ExportRetryCallback?.Invoke(items); } - // otherwise if retryable items still exist and the user wants to be notified notify the user + // otherwise if retryable items still exist and the user wants to be notified else if (items.Count > 0 && atEndOfRetries) _callbacks.ExportMaxRetriesCallback?.Invoke(items); } + Interlocked.Decrement(ref _ongoingExportOperations); _callbacks.ExportBufferCallback?.Invoke(); if (_signal is { IsSet: false }) _signal.Signal(); @@ -355,14 +375,12 @@ private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan max while (InboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item)) { InboundBuffer.Add(item); - Interlocked.Decrement(ref _outstandingOperations); + Interlocked.Decrement(ref _inflightEvents); if (InboundBuffer.DurationSinceFirstWaitToRead >= maxInterval) break; } - _throttleOutboundPublishes.Release(); - if (InboundBuffer.ThresholdsHit) await FlushBufferAsync().ConfigureAwait(false); } @@ -404,8 +422,20 @@ async Task AsyncSlowPath(IOutboundBuffer b) } /// > - public override string ToString() => - DiagnosticsListener != null ? DiagnosticsListener.ToString() : base.ToString(); + public override string ToString() + { + var sb = new StringBuilder(); + if (DiagnosticsListener != null) + sb.AppendLine(DiagnosticsListener.ToString()); + sb.AppendLine($"{nameof(InflightEvents)}: {InflightEvents:N0}"); + sb.AppendLine($"{nameof(BufferOptions.InboundBufferMaxSize)}: {BufferOptions.InboundBufferMaxSize:N0}"); + sb.AppendLine($"{nameof(BatchExportOperations)}: {BatchExportOperations:N0}"); + sb.AppendLine($"{nameof(BatchExportSize)}: {BatchExportSize:N0}"); + sb.AppendLine($"{nameof(DrainSize)}: {DrainSize:N0}"); + sb.AppendLine($"{nameof(MaxConcurrency)}: {MaxConcurrency:N0}"); + sb.AppendLine($"{nameof(ExportTasks)}: {ExportTasks:N0}"); + return sb.ToString(); + } /// public virtual void Dispose() diff --git a/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs index 12c3df2..621becc 100644 --- a/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs @@ -41,15 +41,8 @@ protected override Task ExportAsync(ArraySegment buffer #else IList b = buffer; #endif - if (Options.BufferOptions.OutboundBufferMaxSize != buffer.Count) - { + if (BatchExportSize != buffer.Count) Interlocked.Increment(ref _bufferMismatches); - } - else if (b.Count > 0 && b[0].Id.HasValue) - { - if (b[0].Id % Options.BufferOptions.OutboundBufferMaxSize != 0) - Interlocked.Increment(ref _bufferMismatches); - } return base.ExportAsync(buffer, ctx); } diff --git a/tests/Elastic.Channels.Tests/BehaviorTests.cs b/tests/Elastic.Channels.Tests/BehaviorTests.cs index 9e58183..5c4be12 100644 --- a/tests/Elastic.Channels.Tests/BehaviorTests.cs +++ b/tests/Elastic.Channels.Tests/BehaviorTests.cs @@ -61,7 +61,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers() WaitHandle = new CountdownEvent(1), InboundBufferMaxSize = maxInFlight, OutboundBufferMaxSize = bufferSize, - OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(500) + OutboundBufferMaxLifetime = TimeSpan.FromSeconds(1) }; var channel = new NoopBufferedChannel(bufferOptions); @@ -72,7 +72,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers() if (await channel.WaitToWriteAsync(e)) written++; } - var signalled = bufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(1)); + var signalled = bufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(2)); signalled.Should().BeTrue("The channel was not drained in the expected time"); written.Should().Be(100); channel.ExportedBuffers.Should().Be(1); diff --git a/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs b/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs new file mode 100644 index 0000000..a1c704e --- /dev/null +++ b/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels.Diagnostics; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Channels.Tests; + +public class CalculatedPropertyTests : IDisposable +{ + private readonly ITestOutputHelper _testOutput; + + public CalculatedPropertyTests(ITestOutputHelper testOutput) + { + _testOutput = testOutput; + XunitContext.Register(testOutput); + } + + void IDisposable.Dispose() => XunitContext.Flush(); + + [Theory] + [InlineData(500_000, 50_000, 100_000)] + [InlineData(10_00_000, 50_000, 100_000)] + [InlineData(50_000, 50_000, 25_000)] + [InlineData(10_000, 50_000, 20_000)] + [InlineData(10_00_000, 1_000, 2_000)] + public void BatchExportSizeAndDrainSizeConstraints(int maxInFlight, int bufferSize, int drainSize) + { + var bufferOptions = new BufferOptions + { + InboundBufferMaxSize = maxInFlight, + OutboundBufferMaxSize = bufferSize, + }; + var channel = new NoopBufferedChannel(bufferOptions); + + var expectedConcurrency = + Math.Max(1, Math.Min(maxInFlight / bufferSize, Environment.ProcessorCount * 2)); + channel.MaxConcurrency.Should().Be(expectedConcurrency); + if (maxInFlight >= bufferSize) + channel.BatchExportSize.Should().Be(bufferSize); + else + channel.BatchExportSize.Should().Be(maxInFlight / expectedConcurrency); + + // drain size is max'ed out at 100_000 + channel.DrainSize.Should().Be(Math.Min(100_000, drainSize)); + + } + +} From fbff9bedb6108fc026b2444f0d15ce5324f49f3f Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 17 Sep 2024 21:41:56 +0200 Subject: [PATCH 6/9] fix typo --- tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs b/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs index a1c704e..1f2b707 100644 --- a/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs +++ b/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs @@ -48,7 +48,7 @@ public void BatchExportSizeAndDrainSizeConstraints(int maxInFlight, int bufferSi else channel.BatchExportSize.Should().Be(maxInFlight / expectedConcurrency); - // drain size is max'ed out at 100_000 + // drain size is maxed out at 100_000 channel.DrainSize.Should().Be(Math.Min(100_000, drainSize)); } From 55ebdc402e51c94d9683750e91eae834f6b4c6d2 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 17 Sep 2024 21:55:05 +0200 Subject: [PATCH 7/9] fix tests --- examples/Elastic.Channels.Continuous/Program.cs | 3 ++- src/Elastic.Channels/BufferedChannelBase.cs | 4 ++-- tests/Elastic.Channels.Tests/BehaviorTests.cs | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/Elastic.Channels.Continuous/Program.cs b/examples/Elastic.Channels.Continuous/Program.cs index 9f9698b..5d2b1df 100644 --- a/examples/Elastic.Channels.Continuous/Program.cs +++ b/examples/Elastic.Channels.Continuous/Program.cs @@ -1,4 +1,5 @@ -// Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information using System.Threading.Channels; diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 199f6ee..2df7e48 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -424,9 +424,9 @@ async Task AsyncSlowPath(IOutboundBuffer b) /// > public override string ToString() { + if (DiagnosticsListener == null) return base.ToString(); var sb = new StringBuilder(); - if (DiagnosticsListener != null) - sb.AppendLine(DiagnosticsListener.ToString()); + sb.AppendLine(DiagnosticsListener.ToString()); sb.AppendLine($"{nameof(InflightEvents)}: {InflightEvents:N0}"); sb.AppendLine($"{nameof(BufferOptions.InboundBufferMaxSize)}: {BufferOptions.InboundBufferMaxSize:N0}"); sb.AppendLine($"{nameof(BatchExportOperations)}: {BatchExportOperations:N0}"); diff --git a/tests/Elastic.Channels.Tests/BehaviorTests.cs b/tests/Elastic.Channels.Tests/BehaviorTests.cs index 5c4be12..c2d33dc 100644 --- a/tests/Elastic.Channels.Tests/BehaviorTests.cs +++ b/tests/Elastic.Channels.Tests/BehaviorTests.cs @@ -153,13 +153,13 @@ Task StartChannel(int taskNumber) [Fact] public async Task SlowlyPushEvents() { int totalEvents = 50_000_000, maxInFlight = totalEvents / 5, bufferSize = maxInFlight / 10; - var expectedSentBuffers = totalEvents / bufferSize; + var expectedSentBuffers = totalEvents / 10_000; var bufferOptions = new BufferOptions { WaitHandle = new CountdownEvent(expectedSentBuffers), InboundBufferMaxSize = maxInFlight, OutboundBufferMaxSize = 10_000, - OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(100) + OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(1000) }; using var channel = new DiagnosticsBufferedChannel(bufferOptions, name: $"Slow push channel"); await Task.Delay(TimeSpan.FromMilliseconds(200)); @@ -175,7 +175,7 @@ [Fact] public async Task SlowlyPushEvents() } }, TaskCreationOptions.LongRunning); // wait for some work to have progressed - bufferOptions.WaitHandle.Wait(TimeSpan.FromMilliseconds(500)); + bufferOptions.WaitHandle.Wait(TimeSpan.FromMilliseconds(2000)); //Ensure we written to the channel but not enough to satisfy OutboundBufferMaxSize written.Should().BeGreaterThan(0).And.BeLessThan(10_000); //even though OutboundBufferMaxSize was not hit we should still observe an invocation to Export() From bbf5dc2785d7da2c0a269823e9175cb4c418676e Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 18 Sep 2024 08:47:26 +0200 Subject: [PATCH 8/9] Ensure WaitToWriteAsync is deterministic (in that it will not wait forever if no drain happens) --- src/Elastic.Channels/BufferedChannelBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 2df7e48..ae2128c 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -209,7 +209,7 @@ await ConsumeInboundEventsAsync(BatchExportSize, BufferOptions.OutboundBufferMax public override async ValueTask WaitToWriteAsync(CancellationToken ctx = default) { if (BufferOptions.BoundedChannelFullMode == BoundedChannelFullMode.Wait && _inflightEvents >= BufferOptions.InboundBufferMaxSize - DrainSize) - while (_inflightEvents >= (BufferOptions.InboundBufferMaxSize - DrainSize)) + for (var i = 0; i < 10 && _inflightEvents >= BufferOptions.InboundBufferMaxSize - DrainSize; i++) await Task.Delay(TimeSpan.FromMilliseconds(100), ctx).ConfigureAwait(false); return await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false); } From dd7ae529596e46db2367b7d74cbdf992e5d2083e Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 18 Sep 2024 14:29:07 +0200 Subject: [PATCH 9/9] revert changes to benchmark project --- .../Benchmarks/BulkIngestionBenchmarks.cs | 2 +- .../Program.cs | 23 ++++++++++++++++++- elastic-ingest-dotnet.sln.DotSettings | 5 ++-- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs index 81360ba..52fa94c 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs @@ -70,6 +70,6 @@ public void BulkAll() channel.TryWriteMany(_data); channel.TryComplete(); - // _waitHandle.WaitOne(); + _waitHandle.WaitOne(); } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs index c347608..2564764 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs @@ -14,8 +14,29 @@ using System.Globalization; using Elastic.Ingest.Elasticsearch.Benchmarks.Benchmarks; +#if DEBUG +// MANUALLY RUN A BENCHMARKED METHOD DURING DEBUGGING + +//var bm = new BulkIngestion(); +//bm.Setup(); +//await bm.BulkAllAsync() +//Console.WriteLine("DONE"); + +var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks(); +bm.Setup(); +await bm.WriteToStreamAsync(); + +var length = bm.MemoryStream.Length; + +bm.MemoryStream.Position = 0; +var sr = new StreamReader(bm.MemoryStream); +var json = sr.ReadToEnd(); + +Console.ReadKey(); +#else var config = ManualConfig.Create(DefaultConfig.Instance); config.SummaryStyle = new SummaryStyle(CultureInfo.CurrentCulture, true, BenchmarkDotNet.Columns.SizeUnit.B, null!, ratioStyle: BenchmarkDotNet.Columns.RatioStyle.Percentage); config.AddDiagnoser(MemoryDiagnoser.Default); -BenchmarkRunner.Run(config); +BenchmarkRunner.Run(config); +#endif diff --git a/elastic-ingest-dotnet.sln.DotSettings b/elastic-ingest-dotnet.sln.DotSettings index 172fc64..673b0d8 100644 --- a/elastic-ingest-dotnet.sln.DotSettings +++ b/elastic-ingest-dotnet.sln.DotSettings @@ -122,7 +122,7 @@ See the LICENSE file in the project root for more information </Entry.Match> <Entry.SortBy> <Kind Is="Member" /> - <Name Is="Enter Pattern Here" /> + <Name /> </Entry.SortBy> </Entry> <Entry DisplayName="Readonly Fields"> @@ -140,7 +140,7 @@ See the LICENSE file in the project root for more information <Entry.SortBy> <Access /> <Readonly /> - <Name Is="Enter Pattern Here" /> + <Name /> </Entry.SortBy> </Entry> <Entry DisplayName="Constructors"> @@ -415,6 +415,7 @@ See the LICENSE file in the project root for more information True True True + True True True False