Skip to content

Commit

Permalink
Metrics Collector: Roll back configurable TransportType + bug fix (#…
Browse files Browse the repository at this point in the history
…6665)

Three conceptual changes:
1. Rolling support back for configurable `TransportType` as MetricsCollector is a mature product not expected to take on new features like this. Configurable transport type will not be supported in our future products for collecting metrics, so don't want to mislead customers. That change was in this PR:
#6656
2. Keeping the bug fix from that PR though. Bug description from changelog: 
> "Fix bug where failing connection to EdgeHub blocks independent AzureMonitor upload path".
3. Fixing bug where module won't shutdown if UploadTarget is IoTMessage and client cannot be recreated. Now it will shutdown in this case to be restarted by EdgeHub. If UploadTarget is AzureMonitor and client cannot be created, this is ignored and won't effect MetricsCollector operation as no metrics will be sent. In order to achieve this I have created a composition pattern around ModuleClientWrapper where there are separate implementations for AzureMonitor and IoTMessage upload paths.



## Azure IoT Edge PR checklist:
  • Loading branch information
and-rewsmith authored Sep 13, 2022
1 parent e171a1f commit 14c78fe
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 144 deletions.
5 changes: 2 additions & 3 deletions edge-modules/metrics-collector/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# 1.1.0 (2022-09-02)
### Features
* Add support for configurable transport type [b39d6c5](https://github.com/azure/iotedge/commit/b39d6c5c295cb81a9150141ace9a9909bc2b3686)
# 1.1.0 (2022-09-09)

### Bug Fixes
* Fix bug where failing connection to Edge Hub blocks independent AzureMonitor upload path [b39d6c5](https://github.com/azure/iotedge/commit/b39d6c5c295cb81a9150141ace9a9909bc2b3686)
* Update SharpZipLib and Newtonsoft.Json to patch security vulnerability [1b483e4](https://github.com/Azure/iotedge/commit/1b483e4f114593b9cb40be65598c83bea6811444)
* Update base image to include .NET security fixes from [.NET Core 3.1.28 - August 9, 2022](https://github.com/dotnet/core/blob/main/release-notes/3.1/3.1.28/3.1.28.md)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.IotHubMetricsUpload
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.Devices.Edge.Util;
using ModuleClientWrapper;

public class IotHubMetricsUpload : IMetricsPublisher
{
const string IdentifierPropertyName = "id";
readonly ModuleClientWrapper ModuleClientWrapper;
readonly IModuleClientWrapper ModuleClientWrapper;

public IotHubMetricsUpload(ModuleClientWrapper moduleClientWrapper)
public IotHubMetricsUpload(IModuleClientWrapper moduleClientWrapper)
{
this.ModuleClientWrapper = Preconditions.CheckNotNull(moduleClientWrapper, nameof(moduleClientWrapper));
}
Expand Down Expand Up @@ -47,9 +48,8 @@ public async Task<bool> PublishAsync(IEnumerable<Metric> metrics, CancellationTo
Message metricsMessage = new Message(metricsData);
metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier;

await this.ModuleClientWrapper.SendMessage("metricOutput", metricsMessage);
await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage);

LoggerUtil.Writer.LogInformation("Successfully sent metrics via IoT message");
return true;
}
catch (Exception e)
Expand Down
102 changes: 0 additions & 102 deletions edge-modules/metrics-collector/src/ModuleClientWrapper.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper
{
public class AzureMonitorClientWrapper : IDisposable, IModuleClientWrapper
{
Option<BasicModuleClientWrapper> inner;

public AzureMonitorClientWrapper(Option<BasicModuleClientWrapper> basicModuleClientWrapper)
{
this.inner = basicModuleClientWrapper;
}

public static Task<AzureMonitorClientWrapper> BuildModuleClientWrapperAsync()
{
// Don't try to initialize client so this call won't block.
return Task.FromResult(new AzureMonitorClientWrapper(Option.None<BasicModuleClientWrapper>()));
}

public async Task RecreateClientAsync()
{
await this.inner.Match(async (BasicModuleClientWrapper) =>
{
try
{
await BasicModuleClientWrapper.RecreateClientAsync();
}
catch (Exception)
{
this.inner = Option.None<BasicModuleClientWrapper>();
}
}, async () =>
{
try
{
this.inner = Option.Some(await BasicModuleClientWrapper.BuildModuleClientWrapperAsync());
}
catch (Exception)
{
this.inner = Option.None<BasicModuleClientWrapper>();
}
});
}

public Task SendMessageAsync(string outputName, Message message)
{
throw new Exception("Not expected to send metrics to IoT Hub when upload target is AzureMonitor");
}

public void Dispose()
{
this.inner.ForEach((basicModuleClientWrapper) =>
{
basicModuleClientWrapper.Dispose();
});
}
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper
{
public class BasicModuleClientWrapper : IDisposable, IModuleClientWrapper
{
ModuleClient inner;
SemaphoreSlim moduleClientLock;

public BasicModuleClientWrapper(ModuleClient moduleClient, SemaphoreSlim moduleClientLock)
{
this.inner = moduleClient;
this.moduleClientLock = Preconditions.CheckNotNull(moduleClientLock);
}

public static async Task<BasicModuleClientWrapper> BuildModuleClientWrapperAsync()
{
SemaphoreSlim moduleClientLock = new SemaphoreSlim(1, 1);
ModuleClient moduleClient = await InitializeModuleClientAsync();
return new BasicModuleClientWrapper(moduleClient, moduleClientLock);
}

public async Task RecreateClientAsync()
{

await this.moduleClientLock.WaitAsync();

try
{
this.inner.Dispose();
this.inner = await InitializeModuleClientAsync();
LoggerUtil.Writer.LogInformation("Closed and re-established connection to IoT Hub");
}
catch (Exception)
{
this.moduleClientLock.Release();
throw;
}

this.moduleClientLock.Release();
}

public async Task SendMessageAsync(string outputName, Message message)
{
await this.moduleClientLock.WaitAsync();

try
{
await this.inner.SendEventAsync(outputName, message);
LoggerUtil.Writer.LogInformation("Successfully sent metrics via IoT message");
}
catch (Exception)
{
this.moduleClientLock.Release();
throw;
}

this.moduleClientLock.Release();
}

public void Dispose()
{
this.inner.Dispose();
this.moduleClientLock.Dispose();
}

static async Task<ModuleClient> InitializeModuleClientAsync()
{
TransportType transportType = TransportType.Amqp_Tcp_Only;
LoggerUtil.Writer.LogInformation($"Trying to initialize module client using transport type [{transportType}]");

ITransportSettings[] settings = new ITransportSettings[] { new AmqpTransportSettings(transportType) };
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
moduleClient.ProductInfo = Constants.ProductInfo;

await moduleClient.OpenAsync();
LoggerUtil.Writer.LogInformation($"Successfully initialized module client using transport type [{transportType}]");
return moduleClient;
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper
{
public class IotMessageModuleClientWrapper : IDisposable, IModuleClientWrapper
{
BasicModuleClientWrapper inner;
CancellationTokenSource cancellationTokenSource;

public IotMessageModuleClientWrapper(BasicModuleClientWrapper basicModuleClientWrapper, CancellationTokenSource cts)
{
this.inner = basicModuleClientWrapper;
this.cancellationTokenSource = cts;
}

public static async Task<IModuleClientWrapper> BuildModuleClientWrapperAsync(CancellationTokenSource cts)
{
BasicModuleClientWrapper basicModuleClientWrapper = await BasicModuleClientWrapper.BuildModuleClientWrapperAsync();
return new IotMessageModuleClientWrapper(basicModuleClientWrapper, cts);
}

public async Task RecreateClientAsync()
{
try
{
await this.inner.RecreateClientAsync();
}
catch (Exception e)
{
LoggerUtil.Writer.LogError($"Failed closing and re-establishing connection to IoT Hub: {e.ToString()}");
this.cancellationTokenSource.Cancel();
}
}

public async Task SendMessageAsync(string outputName, Message message)
{
try
{
await this.inner.SendMessageAsync(outputName, message);
}
catch (Exception e)
{
LoggerUtil.Writer.LogError($"Failed sending metrics as IoT message: {e.ToString()}");
}
}

public void Dispose()
{
this.inner.Dispose();
}
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper
{
public interface IModuleClientWrapper
{
Task RecreateClientAsync();
Task SendMessageAsync(string outputName, Message message);

}
}
Loading

0 comments on commit 14c78fe

Please sign in to comment.