From 14c78fea9de250ef54ef6129e36f55587726dd46 Mon Sep 17 00:00:00 2001 From: Andrew Smith Date: Tue, 13 Sep 2022 16:33:38 -0700 Subject: [PATCH] Metrics Collector: Roll back configurable `TransportType` + bug fix (#6665) Three conceptual changes: 1. Rolling support back for configurable `TransportType` as MetricsCollector is a mature product not expected to take on new features like this. Configurable transport type will not be supported in our future products for collecting metrics, so don't want to mislead customers. That change was in this PR: https://github.com/Azure/iotedge/pull/6656 2. Keeping the bug fix from that PR though. Bug description from changelog: > "Fix bug where failing connection to EdgeHub blocks independent AzureMonitor upload path". 3. Fixing bug where module won't shutdown if UploadTarget is IoTMessage and client cannot be recreated. Now it will shutdown in this case to be restarted by EdgeHub. If UploadTarget is AzureMonitor and client cannot be created, this is ignored and won't effect MetricsCollector operation as no metrics will be sent. In order to achieve this I have created a composition pattern around ModuleClientWrapper where there are separate implementations for AzureMonitor and IoTMessage upload paths. ## Azure IoT Edge PR checklist: --- edge-modules/metrics-collector/CHANGELOG.md | 5 +- .../src/IotHubUpload/IotHubUpload.cs | 8 +- .../src/ModuleClientWrapper.cs | 102 ------------------ .../AzureMonitorModuleClientWrapper.cs | 66 ++++++++++++ .../BasicModuleClientWrapper.cs | 87 +++++++++++++++ .../IotMessageModuleClientWrapper.cs | 60 +++++++++++ .../ModuleClientWrapper.cs | 16 +++ edge-modules/metrics-collector/src/Program.cs | 37 +++---- edge-modules/metrics-collector/src/README.md | 3 - .../metrics-collector/src/Settings.cs | 15 +-- 10 files changed, 255 insertions(+), 144 deletions(-) delete mode 100644 edge-modules/metrics-collector/src/ModuleClientWrapper.cs create mode 100644 edge-modules/metrics-collector/src/ModuleClientWrapper/AzureMonitorModuleClientWrapper.cs create mode 100644 edge-modules/metrics-collector/src/ModuleClientWrapper/BasicModuleClientWrapper.cs create mode 100644 edge-modules/metrics-collector/src/ModuleClientWrapper/IotMessageModuleClientWrapper.cs create mode 100644 edge-modules/metrics-collector/src/ModuleClientWrapper/ModuleClientWrapper.cs diff --git a/edge-modules/metrics-collector/CHANGELOG.md b/edge-modules/metrics-collector/CHANGELOG.md index 41bf83bf4db..e9c3a374fdc 100644 --- a/edge-modules/metrics-collector/CHANGELOG.md +++ b/edge-modules/metrics-collector/CHANGELOG.md @@ -1,8 +1,7 @@ -# 1.1.0 (2022-09-02) -### Features -* Add support for configurable transport type [b39d6c5](https://github.com/azure/iotedge/commit/b39d6c5c295cb81a9150141ace9a9909bc2b3686) +# 1.1.0 (2022-09-09) ### Bug Fixes +* Fix bug where failing connection to Edge Hub blocks independent AzureMonitor upload path [b39d6c5](https://github.com/azure/iotedge/commit/b39d6c5c295cb81a9150141ace9a9909bc2b3686) * Update SharpZipLib and Newtonsoft.Json to patch security vulnerability [1b483e4](https://github.com/Azure/iotedge/commit/1b483e4f114593b9cb40be65598c83bea6811444) * Update base image to include .NET security fixes from [.NET Core 3.1.28 - August 9, 2022](https://github.com/dotnet/core/blob/main/release-notes/3.1/3.1.28/3.1.28.md) diff --git a/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs b/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs index 33677f44d50..3a4feecdcd8 100644 --- a/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs +++ b/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs @@ -13,13 +13,14 @@ namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.IotHubMetricsUpload using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Microsoft.Azure.Devices.Edge.Util; + using ModuleClientWrapper; public class IotHubMetricsUpload : IMetricsPublisher { const string IdentifierPropertyName = "id"; - readonly ModuleClientWrapper ModuleClientWrapper; + readonly IModuleClientWrapper ModuleClientWrapper; - public IotHubMetricsUpload(ModuleClientWrapper moduleClientWrapper) + public IotHubMetricsUpload(IModuleClientWrapper moduleClientWrapper) { this.ModuleClientWrapper = Preconditions.CheckNotNull(moduleClientWrapper, nameof(moduleClientWrapper)); } @@ -47,9 +48,8 @@ public async Task PublishAsync(IEnumerable metrics, CancellationTo Message metricsMessage = new Message(metricsData); metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier; - await this.ModuleClientWrapper.SendMessage("metricOutput", metricsMessage); + await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage); - LoggerUtil.Writer.LogInformation("Successfully sent metrics via IoT message"); return true; } catch (Exception e) diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper.cs deleted file mode 100644 index f7f47250f3a..00000000000 --- a/edge-modules/metrics-collector/src/ModuleClientWrapper.cs +++ /dev/null @@ -1,102 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Azure.Devices.Client; -using Microsoft.Azure.Devices.Client.Transport.Mqtt; -using Microsoft.Azure.Devices.Edge.Util; -using Microsoft.Extensions.Logging; - -namespace Microsoft.Azure.Devices.Edge.Azure.Monitor -{ - - public class ModuleClientWrapper : IDisposable - { - ModuleClient inner; - SemaphoreSlim moduleClientLock; - TransportType transportType; - - public ModuleClientWrapper(ModuleClient moduleClient, TransportType transportType, SemaphoreSlim moduleClientLock) - { - this.inner = Preconditions.CheckNotNull(moduleClient); - this.transportType = Preconditions.CheckNotNull(transportType); - this.moduleClientLock = Preconditions.CheckNotNull(moduleClientLock); - } - - public static async Task BuildModuleClientWrapperAsync(TransportType transportType) - { - SemaphoreSlim moduleClientLock = new SemaphoreSlim(1, 1); - ModuleClient moduleClient = await InitializeModuleClientAsync(transportType); - return new ModuleClientWrapper(moduleClient, transportType, moduleClientLock); - } - - public async Task RecreateClientAsync() - { - await this.moduleClientLock.WaitAsync(); - - try - { - this.inner.Dispose(); - this.inner = await InitializeModuleClientAsync(this.transportType); - LoggerUtil.Writer.LogInformation("Closed and re-established connection to IoT Hub"); - } - catch (Exception e) - { - LoggerUtil.Writer.LogWarning($"Failed closing and re-establishing connection to IoT Hub: {e.ToString()}"); - } - - this.moduleClientLock.Release(); - } - - public async Task SendMessage(string outputName, Message message) - { - await this.moduleClientLock.WaitAsync(); - - try - { - await this.inner.SendEventAsync(outputName, message); - } - catch (Exception e) - { - LoggerUtil.Writer.LogError($"Failed sending metrics as IoT message: {e.ToString()}"); - } - - this.moduleClientLock.Release(); - } - - public void Dispose() - { - this.inner.Dispose(); - this.moduleClientLock.Dispose(); - } - - static async Task InitializeModuleClientAsync(TransportType transportType) - { - LoggerUtil.Writer.LogInformation($"Trying to initialize module client using transport type [{transportType}]"); - - ITransportSettings[] GetTransportSettings() - { - switch (transportType) - { - case TransportType.Mqtt: - case TransportType.Mqtt_Tcp_Only: - return new ITransportSettings[] { new MqttTransportSettings(TransportType.Mqtt_Tcp_Only) }; - case TransportType.Mqtt_WebSocket_Only: - return new ITransportSettings[] { new MqttTransportSettings(TransportType.Mqtt_WebSocket_Only) }; - case TransportType.Amqp_WebSocket_Only: - return new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_WebSocket_Only) }; - default: - return new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_Tcp_Only) }; - } - } - - ITransportSettings[] settings = GetTransportSettings(); - ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings); - moduleClient.ProductInfo = Constants.ProductInfo; - await moduleClient.OpenAsync(); - - LoggerUtil.Writer.LogInformation($"Successfully initialized module client using transport type [{transportType}]"); - - return moduleClient; - } - } -} diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper/AzureMonitorModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper/AzureMonitorModuleClientWrapper.cs new file mode 100644 index 00000000000..f9b65667cd1 --- /dev/null +++ b/edge-modules/metrics-collector/src/ModuleClientWrapper/AzureMonitorModuleClientWrapper.cs @@ -0,0 +1,66 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Edge.Util; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper +{ + public class AzureMonitorClientWrapper : IDisposable, IModuleClientWrapper + { + Option inner; + + public AzureMonitorClientWrapper(Option basicModuleClientWrapper) + { + this.inner = basicModuleClientWrapper; + } + + public static Task BuildModuleClientWrapperAsync() + { + // Don't try to initialize client so this call won't block. + return Task.FromResult(new AzureMonitorClientWrapper(Option.None())); + } + + public async Task RecreateClientAsync() + { + await this.inner.Match(async (BasicModuleClientWrapper) => + { + try + { + await BasicModuleClientWrapper.RecreateClientAsync(); + } + catch (Exception) + { + this.inner = Option.None(); + } + }, async () => + { + try + { + this.inner = Option.Some(await BasicModuleClientWrapper.BuildModuleClientWrapperAsync()); + } + catch (Exception) + { + this.inner = Option.None(); + } + }); + } + + public Task SendMessageAsync(string outputName, Message message) + { + throw new Exception("Not expected to send metrics to IoT Hub when upload target is AzureMonitor"); + } + + public void Dispose() + { + this.inner.ForEach((basicModuleClientWrapper) => + { + basicModuleClientWrapper.Dispose(); + }); + } + } +} + + + diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper/BasicModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper/BasicModuleClientWrapper.cs new file mode 100644 index 00000000000..50c85bc3b13 --- /dev/null +++ b/edge-modules/metrics-collector/src/ModuleClientWrapper/BasicModuleClientWrapper.cs @@ -0,0 +1,87 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Edge.Util; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper +{ + public class BasicModuleClientWrapper : IDisposable, IModuleClientWrapper + { + ModuleClient inner; + SemaphoreSlim moduleClientLock; + + public BasicModuleClientWrapper(ModuleClient moduleClient, SemaphoreSlim moduleClientLock) + { + this.inner = moduleClient; + this.moduleClientLock = Preconditions.CheckNotNull(moduleClientLock); + } + + public static async Task BuildModuleClientWrapperAsync() + { + SemaphoreSlim moduleClientLock = new SemaphoreSlim(1, 1); + ModuleClient moduleClient = await InitializeModuleClientAsync(); + return new BasicModuleClientWrapper(moduleClient, moduleClientLock); + } + + public async Task RecreateClientAsync() + { + + await this.moduleClientLock.WaitAsync(); + + try + { + this.inner.Dispose(); + this.inner = await InitializeModuleClientAsync(); + LoggerUtil.Writer.LogInformation("Closed and re-established connection to IoT Hub"); + } + catch (Exception) + { + this.moduleClientLock.Release(); + throw; + } + + this.moduleClientLock.Release(); + } + + public async Task SendMessageAsync(string outputName, Message message) + { + await this.moduleClientLock.WaitAsync(); + + try + { + await this.inner.SendEventAsync(outputName, message); + LoggerUtil.Writer.LogInformation("Successfully sent metrics via IoT message"); + } + catch (Exception) + { + this.moduleClientLock.Release(); + throw; + } + + this.moduleClientLock.Release(); + } + + public void Dispose() + { + this.inner.Dispose(); + this.moduleClientLock.Dispose(); + } + + static async Task InitializeModuleClientAsync() + { + TransportType transportType = TransportType.Amqp_Tcp_Only; + LoggerUtil.Writer.LogInformation($"Trying to initialize module client using transport type [{transportType}]"); + + ITransportSettings[] settings = new ITransportSettings[] { new AmqpTransportSettings(transportType) }; + ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings); + moduleClient.ProductInfo = Constants.ProductInfo; + + await moduleClient.OpenAsync(); + LoggerUtil.Writer.LogInformation($"Successfully initialized module client using transport type [{transportType}]"); + return moduleClient; + } + } +} + diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper/IotMessageModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper/IotMessageModuleClientWrapper.cs new file mode 100644 index 00000000000..1d832e1850b --- /dev/null +++ b/edge-modules/metrics-collector/src/ModuleClientWrapper/IotMessageModuleClientWrapper.cs @@ -0,0 +1,60 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Edge.Util; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper +{ + public class IotMessageModuleClientWrapper : IDisposable, IModuleClientWrapper + { + BasicModuleClientWrapper inner; + CancellationTokenSource cancellationTokenSource; + + public IotMessageModuleClientWrapper(BasicModuleClientWrapper basicModuleClientWrapper, CancellationTokenSource cts) + { + this.inner = basicModuleClientWrapper; + this.cancellationTokenSource = cts; + } + + public static async Task BuildModuleClientWrapperAsync(CancellationTokenSource cts) + { + BasicModuleClientWrapper basicModuleClientWrapper = await BasicModuleClientWrapper.BuildModuleClientWrapperAsync(); + return new IotMessageModuleClientWrapper(basicModuleClientWrapper, cts); + } + + public async Task RecreateClientAsync() + { + try + { + await this.inner.RecreateClientAsync(); + } + catch (Exception e) + { + LoggerUtil.Writer.LogError($"Failed closing and re-establishing connection to IoT Hub: {e.ToString()}"); + this.cancellationTokenSource.Cancel(); + } + } + + public async Task SendMessageAsync(string outputName, Message message) + { + try + { + await this.inner.SendMessageAsync(outputName, message); + } + catch (Exception e) + { + LoggerUtil.Writer.LogError($"Failed sending metrics as IoT message: {e.ToString()}"); + } + } + + public void Dispose() + { + this.inner.Dispose(); + } + } +} + + + diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper/ModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper/ModuleClientWrapper.cs new file mode 100644 index 00000000000..168c6bc4baa --- /dev/null +++ b/edge-modules/metrics-collector/src/ModuleClientWrapper/ModuleClientWrapper.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Edge.Util; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper +{ + public interface IModuleClientWrapper + { + Task RecreateClientAsync(); + Task SendMessageAsync(string outputName, Message message); + + } +} diff --git a/edge-modules/metrics-collector/src/Program.cs b/edge-modules/metrics-collector/src/Program.cs index 76c259d5569..3b7dfc8760d 100644 --- a/edge-modules/metrics-collector/src/Program.cs +++ b/edge-modules/metrics-collector/src/Program.cs @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Devices.Edge.Azure.Monitor using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Microsoft.Azure.Devices.Edge.Util; + using ModuleClientWrapper; using System.Diagnostics; @@ -45,27 +46,11 @@ static async Task MainAsync() #endif LoggerUtil.Writer.LogInformation($"Metrics collector initialized with the following settings:\r\n{Settings.Information}"); - ModuleClientWrapper moduleClientWrapper = null; + IModuleClientWrapper moduleClientWrapper = null; try { - try - { - moduleClientWrapper = await ModuleClientWrapper.BuildModuleClientWrapperAsync(Settings.Current.TransportType); - PeriodicTask periodicIothubConnect = new PeriodicTask(moduleClientWrapper.RecreateClientAsync, Settings.Current.IotHubConnectFrequency, TimeSpan.FromMinutes(1), LoggerUtil.Writer, "Reconnect to IoT Hub", true); - } - catch (Exception e) - { - String msg = String.Format("Error connecting to Edge Hub. Is Edge Hub up and running with settings for clients connecting over {0}? Exception: {1}", Settings.Current.TransportType, e); - if (Settings.Current.UploadTarget == UploadTarget.IotMessage) - { - LoggerUtil.Writer.LogError(msg); - throw; - } - else - { - LoggerUtil.Writer.LogWarning(msg); - } - } + moduleClientWrapper = await BuildModuleClientWrapperAsync(Settings.Current.UploadTarget, cts); + PeriodicTask periodicIothubConnect = new PeriodicTask(moduleClientWrapper.RecreateClientAsync, Settings.Current.IotHubConnectFrequency, TimeSpan.FromMinutes(1), LoggerUtil.Writer, "Reconnect to IoT Hub", true); MetricsScraper scraper = new MetricsScraper(Settings.Current.Endpoints); IMetricsPublisher publisher; @@ -92,7 +77,7 @@ static async Task MainAsync() } finally { - moduleClientWrapper?.Dispose(); + ((IDisposable)moduleClientWrapper)?.Dispose(); } completed.Set(); @@ -101,5 +86,17 @@ static async Task MainAsync() LoggerUtil.Writer.LogInformation("MetricsCollector Main() finished."); return 0; } + + static async Task BuildModuleClientWrapperAsync(UploadTarget uploadTarget, CancellationTokenSource cts) + { + if (uploadTarget == UploadTarget.AzureMonitor) + { + return await AzureMonitorClientWrapper.BuildModuleClientWrapperAsync(); + } + else + { + return await IotMessageModuleClientWrapper.BuildModuleClientWrapperAsync(cts); + } + } } } diff --git a/edge-modules/metrics-collector/src/README.md b/edge-modules/metrics-collector/src/README.md index 411388c31dd..016a4257195 100644 --- a/edge-modules/metrics-collector/src/README.md +++ b/edge-modules/metrics-collector/src/README.md @@ -69,9 +69,6 @@ Optional config items: - Configurable azure domain which is used to construct the log analytics upload address. - ex: `azure.com.cn` - Defaults to `azure.com` -- `TransportType` - - The transport used to connect to IoT Hub when `UploadTarget` is `IotMessage`. Valid values are `Amqp_Tcp_Only`, `Amqp_WebSocket_Only`, `Mqtt_Tcp_Only`, and `Mqtt_WebSocket_Only`. `Amqp` and `Mqtt` are also valid, and are aliases for `Amqp_Tcp_Only` and `Mqtt_Tcp_Only`, respectively. Default is `Amqp_Tcp_Only`. - ## Upload Target: diff --git a/edge-modules/metrics-collector/src/Settings.cs b/edge-modules/metrics-collector/src/Settings.cs index 073fee6657d..83598256efa 100644 --- a/edge-modules/metrics-collector/src/Settings.cs +++ b/edge-modules/metrics-collector/src/Settings.cs @@ -12,7 +12,6 @@ namespace Microsoft.Azure.Devices.Edge.Azure.Monitor using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Microsoft.Azure.Devices.Edge.Util; - using Microsoft.Azure.Devices.Client; internal class Settings { @@ -33,8 +32,7 @@ private Settings( string resourceId, string version, TimeSpan iotHubConnectFrequency, - string azureDomain, - TransportType transportType) + string azureDomain) { this.UploadTarget = uploadTarget; this.ResourceId = Preconditions.CheckNonWhiteSpace(resourceId, nameof(resourceId)); @@ -72,8 +70,6 @@ private Settings( this.Version = version; this.IotHubConnectFrequency = iotHubConnectFrequency; this.AzureDomain = azureDomain; - - this.TransportType = transportType; } private static Settings Create() @@ -100,8 +96,7 @@ private static Settings Create() configuration.GetValue("ResourceID", ""), configuration.GetValue("version", ""), configuration.GetValue("IotHubConnectFrequency", TimeSpan.FromDays(1)), - configuration.GetValue("AzureDomain", "azure.com"), - configuration.GetValue("transportType", TransportType.Amqp_Tcp_Only) + configuration.GetValue("AzureDomain", "azure.com") ); } catch (ArgumentException e) @@ -133,8 +128,7 @@ private static Settings GetInformation() regex.Replace(settings.ResourceId, "$1" + "XXX" + "$3" + "XXX" + "$5"), settings.Version, settings.IotHubConnectFrequency, - settings.AzureDomain, - settings.TransportType); + settings.AzureDomain); } public string LogAnalyticsWorkspaceId { get; } @@ -165,8 +159,6 @@ private static Settings GetInformation() public string AzureDomain { get; } - public TransportType TransportType { get; } - // Used to eliminate secrets when logging the settings public override string ToString() { @@ -184,7 +176,6 @@ public override string ToString() { nameof(this.ResourceId), this.ResourceId ?? string.Empty }, { nameof(this.IotHubConnectFrequency), this.IotHubConnectFrequency.ToString() ?? string.Empty }, { nameof(this.AzureDomain), this.AzureDomain ?? string.Empty }, - { nameof(this.TransportType), this.TransportType.ToString() } }; return $"Settings:{Environment.NewLine}{string.Join(Environment.NewLine, fields.Select(f => $"{f.Key}={f.Value}"))}";