diff --git a/src/Elastic.NLog.Targets/ElasticsearchTarget.cs b/src/Elastic.NLog.Targets/ElasticsearchTarget.cs index c43e6236..81d0a6fb 100644 --- a/src/Elastic.NLog.Targets/ElasticsearchTarget.cs +++ b/src/Elastic.NLog.Targets/ElasticsearchTarget.cs @@ -4,9 +4,11 @@ using Elastic.Channels; using Elastic.Channels.Buffers; using Elastic.Channels.Diagnostics; +using Elastic.CommonSchema.NLog; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.CommonSchema; using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; @@ -24,7 +26,7 @@ public class ElasticsearchTarget : TargetWithLayout /// public override Layout Layout { get => _layout; set => _layout = value as Elastic.CommonSchema.NLog.EcsLayout ?? _layout; } private Elastic.CommonSchema.NLog.EcsLayout _layout = new Elastic.CommonSchema.NLog.EcsLayout(); - private EcsDataStreamChannel? _channel; + private IBufferedChannel? _channel; /// /// Gets or sets the connection pool type. Default for multiple nodes is Sniffing; other supported values are @@ -51,6 +53,41 @@ public class ElasticsearchTarget : TargetWithLayout /// User-configurable arbitrary grouping public Layout? DataStreamNamespace { get; set; } = "default"; + /// + /// Gets or sets the format string for the Elastic search index. The current DateTimeOffset is passed as parameter 0. + /// + /// Example: "dotnet-{0:yyyy.MM.dd}" + /// If no {0} parameter is defined the index name is effectively fixed + /// + public Layout? IndexFormat { get; set; } + + /// + /// Gets or sets the offset to use for the index DateTimeOffset. Default value is null, which uses the system local offset. + /// Use "0" for UTC. + /// + public Layout? IndexOffsetHours { get; set; } + + /// + /// Control the operation header for each bulk operation. Default value is Auto. + /// + /// Can explicit specify Auto, Index or Create + /// + public OperationMode IndexOperation { get; set; } + + /// + /// Gets or sets the optional override of the per document `_id`. + /// + public Layout? IndexEventId + { + get => _layout.EventId; + set + { + _layout.EventId = value; + _hasIndexEventId = value is not null; + } + } + private bool _hasIndexEventId; + /// /// The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped /// Defaults to 100_000 @@ -124,7 +161,7 @@ public Layout? CloudId /// /// Provide callbacks to further configure /// - public Action>? ConfigureChannel { get; set; } + public Action>? ConfigureChannel { get; set; } /// public IChannelDiagnosticsListener? DiagnosticsListener => _channel?.DiagnosticsListener; @@ -132,10 +169,9 @@ public Layout? CloudId /// protected override void InitializeTarget() { - var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent()); - var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty; - var dataStreamSet = DataStreamSet?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty; - var dataStreamNamespace = DataStreamNamespace?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty; + var indexFormat = IndexFormat?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty; + var indexOffsetHours = IndexOffsetHours?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty; + var indexOffset = string.IsNullOrEmpty(indexOffsetHours) ? default(TimeSpan?) : TimeSpan.FromHours(int.Parse(indexOffsetHours)); var connectionPool = CreateNodePool(); var config = new TransportConfiguration(connectionPool, productRegistration: ElasticsearchProductRegistration.Default); @@ -144,11 +180,18 @@ protected override void InitializeTarget() config = SetAuthenticationOnTransport(config); var transport = new DistributedTransport(config); - var channelOptions = new DataStreamChannelOptions(transport) + if (!string.IsNullOrEmpty(indexFormat)) { - DataStream = new DataStreamName(dataStreamType, dataStreamSet, dataStreamNamespace), - WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false), - }; + _channel = CreateIndexChannel(transport, indexFormat, indexOffset, IndexOperation); + } + else + { + _channel = CreateDataStreamChannel(transport); + } + } + + private void SetupChannelOptions(ElasticsearchChannelOptionsBase channelOptions) + { if (InboundBufferMaxSize > 0) channelOptions.BufferOptions.InboundBufferMaxSize = InboundBufferMaxSize; if (OutboundBufferMaxSize > 0) @@ -160,10 +203,43 @@ protected override void InitializeTarget() if (ExportMaxRetries >= 0) channelOptions.BufferOptions.ExportMaxRetries = ExportMaxRetries; ConfigureChannel?.Invoke(channelOptions); + } + private EcsDataStreamChannel CreateDataStreamChannel(DistributedTransport transport) + { + var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent()); + var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty; + var dataStreamSet = DataStreamSet?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty; + var dataStreamNamespace = DataStreamNamespace?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty; + var channelOptions = new DataStreamChannelOptions(transport) + { + DataStream = new DataStreamName(dataStreamType, dataStreamSet, dataStreamNamespace), + WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false), + }; + SetupChannelOptions(channelOptions); var channel = new EcsDataStreamChannel(channelOptions, new[] { new InternalLoggerCallbackListener() }); channel.BootstrapElasticsearch(BootstrapMethod, ilmPolicy); - _channel = channel; + return channel; + } + + private EcsIndexChannel CreateIndexChannel(DistributedTransport transport, string indexFormat, TimeSpan? indexOffset, OperationMode indexOperation) + { + var indexChannelOptions = new IndexChannelOptions(transport) + { + IndexFormat = indexFormat, + IndexOffset = indexOffset, + WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false), + TimestampLookup = l => l.Timestamp, + OperationMode = indexOperation, + }; + + if (_hasIndexEventId) + { + indexChannelOptions.BulkOperationIdLookup = (logEvent) => (logEvent.Event?.Id)!; + } + + SetupChannelOptions(indexChannelOptions); + return new EcsIndexChannel(indexChannelOptions); } /// diff --git a/src/Elastic.NLog.Targets/README.md b/src/Elastic.NLog.Targets/README.md index dd15a9e1..d0aa2226 100644 --- a/src/Elastic.NLog.Targets/README.md +++ b/src/Elastic.NLog.Targets/README.md @@ -53,6 +53,7 @@ var logger = LogManager.GetCurrentClassLogger(); - Cloud - Pool seeded with CloudId - _NodeUris_ - URIs of the Elasticsearch nodes in the connection pool (comma delimited) - _CloudId_ - When using NodePoolType = Cloud + - _BootstrapMethod_ - Whether to configure / bootstrap the destination, which requires user has management capabilities (None, Silent, Failure). Default = None * **Export Authentication** - _ApiKey_ - When using NodePoolType = Cloud and authentication via API key. @@ -67,10 +68,16 @@ var logger = LogManager.GetCurrentClassLogger(); - _ExportMaxRetries_ - Max number of times to retry an export. Default = 3 * **Export DataStream** - - _DataStreamType_ - Generic type describing the data. Defaults = 'logs' + - _DataStreamType_ - Generic type describing the data. Default = 'logs' - _DataStreamSet_ - Describes the data ingested and its structure. Default = 'dotnet' - _DataStreamNamespace_ - User-configurable arbitrary grouping. Default = 'default' +* **Export Index** + - _IndexFormat_ - Format string for the Elastic search index (Ex. `dotnet-{0:yyyy.MM.dd}` or blank means disabled). Default = '' + - _IndexOffsetHours_ - Time offset to use for the index (Ex. `0` for UTC or blank means system local). Default = '' + - _IndexOperation_ - Operation header for each bulk operation (Auto, Index, Create). Default = Auto + - _IndexEventId_ - Optional override of the per document `_id` + Notice that export depends on in-memory queue, that is lost on application-crash / -exit. If higher gurantee of delivery is required, then consider using [Elastic.CommonSchema.NLog](https://www.nuget.org/packages/Elastic.CommonSchema.NLog) together with NLog FileTarget and use [filebeat](https://www.elastic.co/beats/filebeat) to ship these logs. diff --git a/tests-integration/Elastic.NLog.Targets.IntegrationTests/LoggingToIndexIngestionTests.cs b/tests-integration/Elastic.NLog.Targets.IntegrationTests/LoggingToIndexIngestionTests.cs new file mode 100644 index 00000000..9befd54c --- /dev/null +++ b/tests-integration/Elastic.NLog.Targets.IntegrationTests/LoggingToIndexIngestionTests.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels.Diagnostics; +using Elastic.Clients.Elasticsearch.IndexManagement; +using Elastic.CommonSchema; +using Elastic.Ingest.Elasticsearch; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace NLog.Targets.Elastic.IntegrationTests +{ + public class LoggingToIndexIngestionTests : TestBase + { + public LoggingToIndexIngestionTests(LoggingCluster cluster, ITestOutputHelper output) : base(cluster, output) { } + + [Fact] + public async Task EnsureDocumentsEndUpInIndex() + { + var indexPrefix = "catalog-data-"; + var indexFormat = indexPrefix + "{0:yyyy.MM.dd}"; + + using var _ = CreateLogger(out var logger, out var provider, out var @namespace, out var waitHandle, out var listener, (cfg) => + { + cfg.IndexFormat = indexFormat; + cfg.DataStreamType = "x"; + cfg.DataStreamSet = "dotnet"; + var nodesUris = string.Join(",", Client.ElasticsearchClientSettings.NodePool.Nodes.Select(n => n.Uri.ToString()).ToArray()); + cfg.NodeUris = nodesUris; + cfg.NodePoolType = ElasticPoolType.Static; + }); + + var date = DateTimeOffset.Now; + var indexName = string.Format(indexFormat, date); + + var index = await Client.Indices.GetAsync(new GetIndexRequest(indexName)); + index.Indices.Should().BeNullOrEmpty(); + + logger.Error("an error occurred!"); + + if (!waitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {listener}", listener.ObservedException); + + listener.PublishSuccess.Should().BeTrue("{0}", listener); + listener.ObservedException.Should().BeNull(); + + var refreshResult = await Client.Indices.RefreshAsync(indexName); + refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); + var searchResult = await Client.SearchAsync(s => s.Indices(indexName)); + searchResult.Total.Should().Be(1); + + var storedDocument = searchResult.Documents.First(); + storedDocument.Message.Should().Be("an error occurred!"); + + var hit = searchResult.Hits.First(); + hit.Index.Should().Be(indexName); + } + } +}