From 8a4959321e98265606c617e6f27423e09b125053 Mon Sep 17 00:00:00 2001 From: jmartin Date: Thu, 27 Oct 2016 17:07:10 +0200 Subject: [PATCH] Fix: corrected problem when changing of buffer file. --- .../DurableGoogleCloudPubSubSink.cs | 13 ++-- .../GoogleCloudPubSubLogShipper.cs | 71 +++++++++++++------ .../GoogleCloudPubSubSinkOptions.cs | 12 +++- .../GoogleCloudPubSubSinkState.cs | 10 +++ ...onfigurationGoogleCloudPubSubExtensions.cs | 8 ++- .../project.json | 6 +- 6 files changed, 89 insertions(+), 31 deletions(-) diff --git a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/DurableGoogleCloudPubSubSink.cs b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/DurableGoogleCloudPubSubSink.cs index 480f9ad..64b7fae 100644 --- a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/DurableGoogleCloudPubSubSink.cs +++ b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/DurableGoogleCloudPubSubSink.cs @@ -42,6 +42,8 @@ public class DurableGoogleCloudPubSubSink : ILogEventSink, IDisposable // RollingFileSink instance to manage the error file. private RollingFileSink _errorsRollingFileSink; + + private readonly string CNST_Specifier_Separator = "-"; #endregion @@ -93,6 +95,7 @@ public DurableGoogleCloudPubSubSink(GoogleCloudPubSubSinkOptions options) /// If set to 'true' then skiped events (greater than the BatchSizeLimitBytes) will be stored. /// Rolling specifier: {Date}, {Hour} or {HalfHour}. The default one is {Hour}. /// The maximum number of error log files that will be retained, including the current error file. For unlimited retention, pass null. The default is 31. + /// If set to 'true' then all file actions(move forward, delete, ...) will me stored. /// LoggerConfiguration object /// is . /// is . @@ -119,7 +122,8 @@ public DurableGoogleCloudPubSubSink( Dictionary messageAttrFixed = null, bool? debugStoreEventSkip = null, string bufferRollingSpecifier = null, - int? errorRetainedFileCountLimit = null) + int? errorRetainedFileCountLimit = null, + bool? debugStoreFileAction = null) { //--- Creating an options object with the received parameters ------------- @@ -146,7 +150,8 @@ public DurableGoogleCloudPubSubSink( messageAttrFixed, debugStoreEventSkip, bufferRollingSpecifier, - errorRetainedFileCountLimit); + errorRetainedFileCountLimit, + debugStoreFileAction); //----- @@ -173,7 +178,7 @@ private void Initialize(GoogleCloudPubSubSinkOptions options) if (!string.IsNullOrWhiteSpace(options.ErrorBaseFilename)) { this._errorsRollingFileSink = new RollingFileSink( - options.ErrorBaseFilename + "-" + options.BufferRollingSpecifier + errorsFileExtension, + options.ErrorBaseFilename + CNST_Specifier_Separator + options.BufferRollingSpecifier + errorsFileExtension, new GoogleCloudPubSubRawFormatter(), // Formatter for error info (raw). options.ErrorFileSizeLimitBytes, options.ErrorRetainedFileCountLimit @@ -190,7 +195,7 @@ private void Initialize(GoogleCloudPubSubSinkOptions options) //--- RollingFileSink to store data to be sent to PubSub ------------------ // It will be generated a file for each day. this._dataRollingFileSink = new RollingFileSink( - options.BufferBaseFilename + "-" + options.BufferRollingSpecifier + options.BufferFileExtension, + options.BufferBaseFilename + CNST_Specifier_Separator + options.BufferRollingSpecifier + options.BufferFileExtension, this._state.DurableFormatter, // Formatter for data to insert into the buffer file. options.BufferFileSizeLimitBytes, null diff --git a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubLogShipper.cs b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubLogShipper.cs index bbcbf89..5b5211f 100644 --- a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubLogShipper.cs +++ b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubLogShipper.cs @@ -49,6 +49,10 @@ class GoogleCloudPubSubLogShipper : IDisposable private static string CNST_Shipper_Debug = "Shipper [Debug]: "; private static readonly int CNST_NewLineBytes = Encoding.UTF8.GetByteCount(Environment.NewLine); + + // Stats for current file. + private int linesSentForCurrentFile = 0; + private int batchsSentForCurrentFile = 0; #endregion @@ -164,20 +168,18 @@ void OnTick() } // Anyway, we don't have a buffer file to read from, so we exit without sending data neither updating the bookmark. - if (currentFilePath == null) continue; + if (currentFilePath == null) continue; // >>>>>>>>>>>>> - //--- File date recovery and file extension validation --- - // file name pattern: whatever-xxx-xxx-{date}.{ext}, whatever-xxx-xxx-{date}_1.{ext}, etc. - // lastToken should be something like {date}.{ext} or {date}_1.{ext} now - string lastToken = currentFilePath.Split('-').Last(); - if (!lastToken.ToLowerInvariant().EndsWith(this._state.Options.BufferFileExtension)) + // Position of the current file in the set. + int currentFileSetPosition = 0; + if (!this.GetFileSetPosition(fileSet, currentFilePath, out currentFileSetPosition)) { - throw new FormatException(string.Format("The file name '{0}' does not seem to follow the right file pattern - it must be named [whatever]-{{Date}}[_n].{extension}", Path.GetFileName(currentFilePath))); + this._state.Error($"The current file (bookmark) does not exist in the file set and I don't know what to do. [{currentFilePath}]"); + continue; // >>>>>>>>>>>>> } - string dateString = lastToken.Substring(0, 8); - DateTime date = DateTime.ParseExact(dateString, "yyyyMMdd", CultureInfo.InvariantCulture); - + // It is not done any file date recovery or file extension validation to do not disturb the main app. + // All files that match the template (that are contained in the FileSet) will be considered and managed. //--- 2nd step : read current buffer file and position ------------------------------------------------------ @@ -281,15 +283,17 @@ void OnTick() //--- OK --- GoogleCloudPubSubLogShipper.WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFilePath); this._connectionSchedule.MarkSuccess(); + this.linesSentForCurrentFile += payloadStr.Count; + this.batchsSentForCurrentFile++; //--- - this._state.Debug($"{CNST_Shipper_Debug} Data sent OK to PubSub."); + this._state.Debug($"{CNST_Shipper_Debug} OK sending data to PubSub."); } else { //--- ERROR --- this._connectionSchedule.MarkFailure(); //--- - auxMessage = $"{CNST_Shipper_Error} Data sent ERROR to PubSub. [{response.ErrorMessage}]"; + auxMessage = $"{CNST_Shipper_Error} ERROR sending data to PubSub. [{response.ErrorMessage}]"; SelfLog.WriteLine(auxMessage); this._state.Error(auxMessage, payloadStr); break; @@ -314,12 +318,18 @@ void OnTick() // regular interval, so mark the attempt as successful. this._connectionSchedule.MarkSuccess(); - // Only advance the bookmark if no other process has the - // current file locked, and its length is as we found it. - if (fileSet.Length == 2 && fileSet.First() == currentFilePath && IsUnlockedAtLength(currentFilePath, nextLineBeginsAtOffset, this._state)) + // Only advance the bookmark if no other process has the current file locked, and its length is as we found it + // and there is another next file. + int nextFileSetPosition = currentFileSetPosition + 1; + if (nextFileSetPosition < fileSet.Length && IsUnlockedAtLength(currentFilePath, nextLineBeginsAtOffset, this._state)) { - this._state.Debug($"{CNST_Shipper_Debug} Move forward to next file. [{fileSet[1]}]."); - GoogleCloudPubSubLogShipper.WriteBookmark(bookmark, 0, fileSet[1]); + // --- Move to next file -------------------------------------------------- + this._state.DebugFileAction($"{CNST_Shipper_Debug} File finished: Lines=[{this.linesSentForCurrentFile}] Batchs=[{this.batchsSentForCurrentFile}] File=[{currentFilePath}]"); + this._state.DebugFileAction($"{CNST_Shipper_Debug} Move forward to next file. [{fileSet[nextFileSetPosition]}]."); + GoogleCloudPubSubLogShipper.WriteBookmark(bookmark, 0, fileSet[nextFileSetPosition]); + this.linesSentForCurrentFile = 0; + this.batchsSentForCurrentFile = 0; + //------------------------------------------------------------------------- } //if (fileSet.Length > 2) @@ -336,9 +346,9 @@ void OnTick() //--- Retained File Count Limit -------------- // If necessary, one obsolete file is deleted each time. // It is done even there is or not data to send: it is possible that our application is sending data at any time. - if (fileSet.Length > 2 && fileSet.Length > this._retainedFileCountLimit && fileSet.First() != currentFilePath) + if (fileSet.Length > 2 && fileSet.Length > this._retainedFileCountLimit && currentFileSetPosition > 0) { - this._state.Debug($"{CNST_Shipper_Debug} File delete. [{fileSet[0]}]."); + this._state.DebugFileAction($"{CNST_Shipper_Debug} File delete. [{fileSet[0]}]."); System.IO.File.Delete(fileSet[0]); } //-------------------------------------------- @@ -449,7 +459,7 @@ static bool TryReadLine(Stream current, ref long nextStart, out string nextLine, } - static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, out string currentFile) + static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, out string currentFile) { nextLineBeginsAtOffset = 0; currentFile = null; @@ -482,6 +492,27 @@ string[] GetFileSet() .OrderBy(n => n) .ToArray(); } + + bool GetFileSetPosition(string[] fileSet, string filePath, out int fileSetPosition) + { + fileSetPosition = 0; + + if (fileSet != null) + { + foreach (string f in fileSet) + { + if (fileSet[fileSetPosition] == filePath) + { + return true; + } + + fileSetPosition++; + } + } + + return false; + } + #endregion diff --git a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubSinkOptions.cs b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubSinkOptions.cs index 8508ffb..8ae26d5 100644 --- a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubSinkOptions.cs +++ b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubSinkOptions.cs @@ -161,6 +161,11 @@ public class GoogleCloudPubSubSinkOptions /// public int? ErrorRetainedFileCountLimit { get; set; } + /// + /// If set to 'true' then all file actions (move forward, delete, ...) will me stored. + /// + public bool DebugStoreFileAction { get; set; } = false; + #endregion @@ -274,6 +279,7 @@ public GoogleCloudPubSubSinkOptions(string projectId, string topicId) : this() /// If set to 'true' then skiped events (greater than the BatchSizeLimitBytes) will be stored. /// Rolling specifier: {Date}, {Hour} or {HalfHour}. The default one is {Hour}. /// The maximum number of error log files that will be retained, including the current error file. For unlimited retention, pass null. The default is 31. + /// If set to 'true' then all file actions (move forward, delete, ...) will me stored. public void SetValues( string bufferBaseFilename, long? bufferFileSizeLimitBytes = null, @@ -294,7 +300,8 @@ public void SetValues( Dictionary messageAttrFixed = null, bool? debugStoreEventSkip = null, string bufferRollingSpecifier = null, - int? errorRetainedFileCountLimit = null) + int? errorRetainedFileCountLimit = null, + bool? debugStoreFileAction = null) { this.BufferBaseFilename = bufferBaseFilename; this.ErrorBaseFilename = errorBaseFilename; @@ -346,6 +353,9 @@ public void SetValues( if (errorRetainedFileCountLimit != null) this.ErrorRetainedFileCountLimit = errorRetainedFileCountLimit.Value; + if (debugStoreFileAction != null) + this.DebugStoreFileAction = debugStoreFileAction.Value; + //--- if (messageDataToBase64 != null) diff --git a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubSinkState.cs b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubSinkState.cs index 3f8c42c..6bfe660 100644 --- a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubSinkState.cs +++ b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/GoogleCloudPubSub/GoogleCloudPubSubSinkState.cs @@ -360,6 +360,16 @@ public void DebugEventSkip(string message, string eventData) //--- + public void DebugFileAction(string message) + { + if (this.Options.DebugStoreAll || this.Options.DebugStoreFileAction) + { + this._ErrorDebugStore(message, null, false); + } + } + + //--- + public void _ErrorDebugStore(string message, List payloadStr, bool savePayload) { // This method stores an error or debug information (if necessary). diff --git a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/LoggerConfigurationGoogleCloudPubSubExtensions.cs b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/LoggerConfigurationGoogleCloudPubSubExtensions.cs index 3109804..719a9db 100644 --- a/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/LoggerConfigurationGoogleCloudPubSubExtensions.cs +++ b/src/Serilog.Sinks.GoogleCloudPubSub/Sinks/LoggerConfigurationGoogleCloudPubSubExtensions.cs @@ -65,7 +65,7 @@ public static class LoggerConfigurationGoogleCloudPubSubExtensions /// If set to 'true' then skiped events (greater than the BatchSizeLimitBytes) will be stored. /// Rolling specifier: {Date}, {Hour} or {HalfHour}. The default one is {Hour}. /// The maximum number of error log files that will be retained, including the current error file. For unlimited retention, pass null. The default is 31. - /// ErrorRetainedFileCountLimit + /// If set to 'true' then all file actions (move forward, delete, ...) will me stored. /// LoggerConfiguration object /// is . /// is . @@ -93,7 +93,8 @@ public static LoggerConfiguration GoogleCloudPubSub( Dictionary messageAttrFixed = null, bool? debugStoreEventSkip = null, string bufferRollingSpecifier = null, - int? errorRetainedFileCountLimit = null) + int? errorRetainedFileCountLimit = null, + bool? debugStoreFileAction = null) { //--- Creating an options object with the received parameters ------------- @@ -120,7 +121,8 @@ public static LoggerConfiguration GoogleCloudPubSub( messageAttrFixed, debugStoreEventSkip, bufferRollingSpecifier, - errorRetainedFileCountLimit); + errorRetainedFileCountLimit, + debugStoreFileAction); //--- Mandatory parameters ------------ diff --git a/src/Serilog.Sinks.GoogleCloudPubSub/project.json b/src/Serilog.Sinks.GoogleCloudPubSub/project.json index 28f8d76..86668ef 100644 --- a/src/Serilog.Sinks.GoogleCloudPubSub/project.json +++ b/src/Serilog.Sinks.GoogleCloudPubSub/project.json @@ -1,5 +1,5 @@ { - "version": "2.0.1-*", + "version": "2.0.2-*", "description": "The Google Cloud Pub/Sub Sink for Serilog", "authors": ["Oscar PĂ©rez, XMLTravelgate CTO"], "packOptions": { @@ -16,7 +16,7 @@ "dependencies": { "Serilog": "2.3.0", "Serilog.Sinks.PeriodicBatching": "2.0.0", - "Serilog.Sinks.RollingFile": "3.2.0-dev-00753", + "Serilog.Sinks.RollingFile": "3.2.0", "Google.Pubsub.V1": "1.0.0-beta02" } }, @@ -24,7 +24,7 @@ "dependencies": { "Serilog": "2.3.0", "Serilog.Sinks.PeriodicBatching": "2.0.0", - "Serilog.Sinks.RollingFile": "3.2.0-dev-00753", + "Serilog.Sinks.RollingFile": "3.2.0", "Google.Pubsub.V1": "1.0.0-beta02" } }