Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
jose3m committed Oct 27, 2016
2 parents e28c06e + 8a49593 commit 665e366
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class DurableGoogleCloudPubSubSink : ILogEventSink, IDisposable

// RollingFileSink instance to manage the error file.
private RollingFileSink _errorsRollingFileSink;

private readonly string CNST_Specifier_Separator = "-";
#endregion


Expand Down Expand Up @@ -93,6 +95,7 @@ public DurableGoogleCloudPubSubSink(GoogleCloudPubSubSinkOptions options)
/// <param name="debugStoreEventSkip">If set to 'true' then skiped events (greater than the BatchSizeLimitBytes) will be stored.</param>
/// <param name="bufferRollingSpecifier">Rolling specifier: {Date}, {Hour} or {HalfHour}. The default one is {Hour}.</param>
/// <param name="errorRetainedFileCountLimit">The maximum number of error log files that will be retained, including the current error file. For unlimited retention, pass null. The default is 31.</param>
/// <param name="debugStoreFileAction">If set to 'true' then all file actions(move forward, delete, ...) will me stored.</param>
/// <returns>LoggerConfiguration object</returns>
/// <exception cref="ArgumentNullException"><paramref name="projectId"/> is <see langword="null" />.</exception>
/// <exception cref="ArgumentNullException"><paramref name="topicId"/> is <see langword="null" />.</exception>
Expand All @@ -119,7 +122,8 @@ public DurableGoogleCloudPubSubSink(
Dictionary<string, string> messageAttrFixed = null,
bool? debugStoreEventSkip = null,
string bufferRollingSpecifier = null,
int? errorRetainedFileCountLimit = null)
int? errorRetainedFileCountLimit = null,
bool? debugStoreFileAction = null)
{

//--- Creating an options object with the received parameters -------------
Expand All @@ -146,7 +150,8 @@ public DurableGoogleCloudPubSubSink(
messageAttrFixed,
debugStoreEventSkip,
bufferRollingSpecifier,
errorRetainedFileCountLimit);
errorRetainedFileCountLimit,
debugStoreFileAction);

//-----

Expand All @@ -173,7 +178,7 @@ private void Initialize(GoogleCloudPubSubSinkOptions options)
if (!string.IsNullOrWhiteSpace(options.ErrorBaseFilename))
{
this._errorsRollingFileSink = new RollingFileSink(
options.ErrorBaseFilename + "-" + options.BufferRollingSpecifier + errorsFileExtension,
options.ErrorBaseFilename + CNST_Specifier_Separator + options.BufferRollingSpecifier + errorsFileExtension,
new GoogleCloudPubSubRawFormatter(), // Formatter for error info (raw).
options.ErrorFileSizeLimitBytes,
options.ErrorRetainedFileCountLimit
Expand All @@ -190,7 +195,7 @@ private void Initialize(GoogleCloudPubSubSinkOptions options)
//--- RollingFileSink to store data to be sent to PubSub ------------------
// It will be generated a file for each day.
this._dataRollingFileSink = new RollingFileSink(
options.BufferBaseFilename + "-" + options.BufferRollingSpecifier + options.BufferFileExtension,
options.BufferBaseFilename + CNST_Specifier_Separator + options.BufferRollingSpecifier + options.BufferFileExtension,
this._state.DurableFormatter, // Formatter for data to insert into the buffer file.
options.BufferFileSizeLimitBytes,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class GoogleCloudPubSubLogShipper : IDisposable
private static string CNST_Shipper_Debug = "Shipper [Debug]: ";

private static readonly int CNST_NewLineBytes = Encoding.UTF8.GetByteCount(Environment.NewLine);

// Stats for current file.
private int linesSentForCurrentFile = 0;
private int batchsSentForCurrentFile = 0;
#endregion


Expand Down Expand Up @@ -164,20 +168,18 @@ void OnTick()
}

// Anyway, we don't have a buffer file to read from, so we exit without sending data neither updating the bookmark.
if (currentFilePath == null) continue;
if (currentFilePath == null) continue; // >>>>>>>>>>>>>

//--- File date recovery and file extension validation ---
// file name pattern: whatever-xxx-xxx-{date}.{ext}, whatever-xxx-xxx-{date}_1.{ext}, etc.
// lastToken should be something like {date}.{ext} or {date}_1.{ext} now
string lastToken = currentFilePath.Split('-').Last();
if (!lastToken.ToLowerInvariant().EndsWith(this._state.Options.BufferFileExtension))
// Position of the current file in the set.
int currentFileSetPosition = 0;
if (!this.GetFileSetPosition(fileSet, currentFilePath, out currentFileSetPosition))
{
throw new FormatException(string.Format("The file name '{0}' does not seem to follow the right file pattern - it must be named [whatever]-{{Date}}[_n].{extension}", Path.GetFileName(currentFilePath)));
this._state.Error($"The current file (bookmark) does not exist in the file set and I don't know what to do. [{currentFilePath}]");
continue; // >>>>>>>>>>>>>
}
string dateString = lastToken.Substring(0, 8);
DateTime date = DateTime.ParseExact(dateString, "yyyyMMdd", CultureInfo.InvariantCulture);


// It is not done any file date recovery or file extension validation to do not disturb the main app.
// All files that match the template (that are contained in the FileSet) will be considered and managed.


//--- 2nd step : read current buffer file and position ------------------------------------------------------
Expand Down Expand Up @@ -281,15 +283,17 @@ void OnTick()
//--- OK ---
GoogleCloudPubSubLogShipper.WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFilePath);
this._connectionSchedule.MarkSuccess();
this.linesSentForCurrentFile += payloadStr.Count;
this.batchsSentForCurrentFile++;
//---
this._state.Debug($"{CNST_Shipper_Debug} Data sent OK to PubSub.");
this._state.Debug($"{CNST_Shipper_Debug} OK sending data to PubSub.");
}
else
{
//--- ERROR ---
this._connectionSchedule.MarkFailure();
//---
auxMessage = $"{CNST_Shipper_Error} Data sent ERROR to PubSub. [{response.ErrorMessage}]";
auxMessage = $"{CNST_Shipper_Error} ERROR sending data to PubSub. [{response.ErrorMessage}]";
SelfLog.WriteLine(auxMessage);
this._state.Error(auxMessage, payloadStr);
break;
Expand All @@ -314,12 +318,18 @@ void OnTick()
// regular interval, so mark the attempt as successful.
this._connectionSchedule.MarkSuccess();

// Only advance the bookmark if no other process has the
// current file locked, and its length is as we found it.
if (fileSet.Length == 2 && fileSet.First() == currentFilePath && IsUnlockedAtLength(currentFilePath, nextLineBeginsAtOffset, this._state))
// Only advance the bookmark if no other process has the current file locked, and its length is as we found it
// and there is another next file.
int nextFileSetPosition = currentFileSetPosition + 1;
if (nextFileSetPosition < fileSet.Length && IsUnlockedAtLength(currentFilePath, nextLineBeginsAtOffset, this._state))
{
this._state.Debug($"{CNST_Shipper_Debug} Move forward to next file. [{fileSet[1]}].");
GoogleCloudPubSubLogShipper.WriteBookmark(bookmark, 0, fileSet[1]);
// --- Move to next file --------------------------------------------------
this._state.DebugFileAction($"{CNST_Shipper_Debug} File finished: Lines=[{this.linesSentForCurrentFile}] Batchs=[{this.batchsSentForCurrentFile}] File=[{currentFilePath}]");
this._state.DebugFileAction($"{CNST_Shipper_Debug} Move forward to next file. [{fileSet[nextFileSetPosition]}].");
GoogleCloudPubSubLogShipper.WriteBookmark(bookmark, 0, fileSet[nextFileSetPosition]);
this.linesSentForCurrentFile = 0;
this.batchsSentForCurrentFile = 0;
//-------------------------------------------------------------------------
}

//if (fileSet.Length > 2)
Expand All @@ -336,9 +346,9 @@ void OnTick()
//--- Retained File Count Limit --------------
// If necessary, one obsolete file is deleted each time.
// It is done even there is or not data to send: it is possible that our application is sending data at any time.
if (fileSet.Length > 2 && fileSet.Length > this._retainedFileCountLimit && fileSet.First() != currentFilePath)
if (fileSet.Length > 2 && fileSet.Length > this._retainedFileCountLimit && currentFileSetPosition > 0)
{
this._state.Debug($"{CNST_Shipper_Debug} File delete. [{fileSet[0]}].");
this._state.DebugFileAction($"{CNST_Shipper_Debug} File delete. [{fileSet[0]}].");
System.IO.File.Delete(fileSet[0]);
}
//--------------------------------------------
Expand Down Expand Up @@ -449,7 +459,7 @@ static bool TryReadLine(Stream current, ref long nextStart, out string nextLine,
}


static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, out string currentFile)
static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, out string currentFile)
{
nextLineBeginsAtOffset = 0;
currentFile = null;
Expand Down Expand Up @@ -482,6 +492,27 @@ string[] GetFileSet()
.OrderBy(n => n)
.ToArray();
}

bool GetFileSetPosition(string[] fileSet, string filePath, out int fileSetPosition)
{
fileSetPosition = 0;

if (fileSet != null)
{
foreach (string f in fileSet)
{
if (fileSet[fileSetPosition] == filePath)
{
return true;
}

fileSetPosition++;
}
}

return false;
}

#endregion


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ public class GoogleCloudPubSubSinkOptions
/// </summary>
public int? ErrorRetainedFileCountLimit { get; set; }

/// <summary>
/// If set to 'true' then all file actions (move forward, delete, ...) will me stored.
/// </summary>
public bool DebugStoreFileAction { get; set; } = false;

#endregion


Expand Down Expand Up @@ -274,6 +279,7 @@ public GoogleCloudPubSubSinkOptions(string projectId, string topicId) : this()
/// <param name="debugStoreEventSkip">If set to 'true' then skiped events (greater than the BatchSizeLimitBytes) will be stored.</param>
/// <param name="bufferRollingSpecifier">Rolling specifier: {Date}, {Hour} or {HalfHour}. The default one is {Hour}.</param>
/// <param name="errorRetainedFileCountLimit">The maximum number of error log files that will be retained, including the current error file. For unlimited retention, pass null. The default is 31.</param>
/// <param name="debugStoreFileAction">If set to 'true' then all file actions (move forward, delete, ...) will me stored.</param>
public void SetValues(
string bufferBaseFilename,
long? bufferFileSizeLimitBytes = null,
Expand All @@ -294,7 +300,8 @@ public void SetValues(
Dictionary<string, string> messageAttrFixed = null,
bool? debugStoreEventSkip = null,
string bufferRollingSpecifier = null,
int? errorRetainedFileCountLimit = null)
int? errorRetainedFileCountLimit = null,
bool? debugStoreFileAction = null)
{
this.BufferBaseFilename = bufferBaseFilename;
this.ErrorBaseFilename = errorBaseFilename;
Expand Down Expand Up @@ -346,6 +353,9 @@ public void SetValues(
if (errorRetainedFileCountLimit != null)
this.ErrorRetainedFileCountLimit = errorRetainedFileCountLimit.Value;

if (debugStoreFileAction != null)
this.DebugStoreFileAction = debugStoreFileAction.Value;

//---

if (messageDataToBase64 != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,16 @@ public void DebugEventSkip(string message, string eventData)

//---

public void DebugFileAction(string message)
{
if (this.Options.DebugStoreAll || this.Options.DebugStoreFileAction)
{
this._ErrorDebugStore(message, null, false);
}
}

//---

public void _ErrorDebugStore(string message, List<string> payloadStr, bool savePayload)
{
// This method stores an error or debug information (if necessary).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static class LoggerConfigurationGoogleCloudPubSubExtensions
/// <param name="debugStoreEventSkip">If set to 'true' then skiped events (greater than the BatchSizeLimitBytes) will be stored.</param>
/// <param name="bufferRollingSpecifier">Rolling specifier: {Date}, {Hour} or {HalfHour}. The default one is {Hour}.</param>
/// <param name="errorRetainedFileCountLimit">The maximum number of error log files that will be retained, including the current error file. For unlimited retention, pass null. The default is 31.</param>
/// ErrorRetainedFileCountLimit
/// <param name="debugStoreFileAction">If set to 'true' then all file actions (move forward, delete, ...) will me stored.</param>
/// <returns>LoggerConfiguration object</returns>
/// <exception cref="ArgumentNullException"><paramref name="projectId"/> is <see langword="null" />.</exception>
/// <exception cref="ArgumentNullException"><paramref name="topicId"/> is <see langword="null" />.</exception>
Expand Down Expand Up @@ -93,7 +93,8 @@ public static LoggerConfiguration GoogleCloudPubSub(
Dictionary<string, string> messageAttrFixed = null,
bool? debugStoreEventSkip = null,
string bufferRollingSpecifier = null,
int? errorRetainedFileCountLimit = null)
int? errorRetainedFileCountLimit = null,
bool? debugStoreFileAction = null)
{

//--- Creating an options object with the received parameters -------------
Expand All @@ -120,7 +121,8 @@ public static LoggerConfiguration GoogleCloudPubSub(
messageAttrFixed,
debugStoreEventSkip,
bufferRollingSpecifier,
errorRetainedFileCountLimit);
errorRetainedFileCountLimit,
debugStoreFileAction);


//--- Mandatory parameters ------------
Expand Down
6 changes: 3 additions & 3 deletions src/Serilog.Sinks.GoogleCloudPubSub/project.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "2.0.1-*",
"version": "2.0.2-*",
"description": "The Google Cloud Pub/Sub Sink for Serilog",
"authors": ["Oscar Pérez, XMLTravelgate CTO"],
"packOptions": {
Expand All @@ -16,15 +16,15 @@
"dependencies": {
"Serilog": "2.3.0",
"Serilog.Sinks.PeriodicBatching": "2.0.0",
"Serilog.Sinks.RollingFile": "3.2.0-dev-00753",
"Serilog.Sinks.RollingFile": "3.2.0",
"Google.Pubsub.V1": "1.0.0-beta02"
}
},
"net4.6": {
"dependencies": {
"Serilog": "2.3.0",
"Serilog.Sinks.PeriodicBatching": "2.0.0",
"Serilog.Sinks.RollingFile": "3.2.0-dev-00753",
"Serilog.Sinks.RollingFile": "3.2.0",
"Google.Pubsub.V1": "1.0.0-beta02"
}
}
Expand Down

0 comments on commit 665e366

Please sign in to comment.