diff --git a/edge-modules/metrics-collector/CHANGELOG.md b/edge-modules/metrics-collector/CHANGELOG.md index 41bf83bf4db..e9c3a374fdc 100644 --- a/edge-modules/metrics-collector/CHANGELOG.md +++ b/edge-modules/metrics-collector/CHANGELOG.md @@ -1,8 +1,7 @@ -# 1.1.0 (2022-09-02) -### Features -* Add support for configurable transport type [b39d6c5](https://github.com/azure/iotedge/commit/b39d6c5c295cb81a9150141ace9a9909bc2b3686) +# 1.1.0 (2022-09-09) ### Bug Fixes +* Fix bug where failing connection to Edge Hub blocks independent AzureMonitor upload path [b39d6c5](https://github.com/azure/iotedge/commit/b39d6c5c295cb81a9150141ace9a9909bc2b3686) * Update SharpZipLib and Newtonsoft.Json to patch security vulnerability [1b483e4](https://github.com/Azure/iotedge/commit/1b483e4f114593b9cb40be65598c83bea6811444) * Update base image to include .NET security fixes from [.NET Core 3.1.28 - August 9, 2022](https://github.com/dotnet/core/blob/main/release-notes/3.1/3.1.28/3.1.28.md) diff --git a/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs b/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs index 33677f44d50..3a4feecdcd8 100644 --- a/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs +++ b/edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs @@ -13,13 +13,14 @@ namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.IotHubMetricsUpload using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Microsoft.Azure.Devices.Edge.Util; + using ModuleClientWrapper; public class IotHubMetricsUpload : IMetricsPublisher { const string IdentifierPropertyName = "id"; - readonly ModuleClientWrapper ModuleClientWrapper; + readonly IModuleClientWrapper ModuleClientWrapper; - public IotHubMetricsUpload(ModuleClientWrapper moduleClientWrapper) + public IotHubMetricsUpload(IModuleClientWrapper moduleClientWrapper) { this.ModuleClientWrapper = Preconditions.CheckNotNull(moduleClientWrapper, nameof(moduleClientWrapper)); } @@ -47,9 +48,8 @@ public async Task PublishAsync(IEnumerable metrics, CancellationTo Message metricsMessage = new Message(metricsData); metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier; - await this.ModuleClientWrapper.SendMessage("metricOutput", metricsMessage); + await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage); - LoggerUtil.Writer.LogInformation("Successfully sent metrics via IoT message"); return true; } catch (Exception e) diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper.cs deleted file mode 100644 index f7f47250f3a..00000000000 --- a/edge-modules/metrics-collector/src/ModuleClientWrapper.cs +++ /dev/null @@ -1,102 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Azure.Devices.Client; -using Microsoft.Azure.Devices.Client.Transport.Mqtt; -using Microsoft.Azure.Devices.Edge.Util; -using Microsoft.Extensions.Logging; - -namespace Microsoft.Azure.Devices.Edge.Azure.Monitor -{ - - public class ModuleClientWrapper : IDisposable - { - ModuleClient inner; - SemaphoreSlim moduleClientLock; - TransportType transportType; - - public ModuleClientWrapper(ModuleClient moduleClient, TransportType transportType, SemaphoreSlim moduleClientLock) - { - this.inner = Preconditions.CheckNotNull(moduleClient); - this.transportType = Preconditions.CheckNotNull(transportType); - this.moduleClientLock = Preconditions.CheckNotNull(moduleClientLock); - } - - public static async Task BuildModuleClientWrapperAsync(TransportType transportType) - { - SemaphoreSlim moduleClientLock = new SemaphoreSlim(1, 1); - ModuleClient moduleClient = await InitializeModuleClientAsync(transportType); - return new ModuleClientWrapper(moduleClient, transportType, moduleClientLock); - } - - public async Task RecreateClientAsync() - { - await this.moduleClientLock.WaitAsync(); - - try - { - this.inner.Dispose(); - this.inner = await InitializeModuleClientAsync(this.transportType); - LoggerUtil.Writer.LogInformation("Closed and re-established connection to IoT Hub"); - } - catch (Exception e) - { - LoggerUtil.Writer.LogWarning($"Failed closing and re-establishing connection to IoT Hub: {e.ToString()}"); - } - - this.moduleClientLock.Release(); - } - - public async Task SendMessage(string outputName, Message message) - { - await this.moduleClientLock.WaitAsync(); - - try - { - await this.inner.SendEventAsync(outputName, message); - } - catch (Exception e) - { - LoggerUtil.Writer.LogError($"Failed sending metrics as IoT message: {e.ToString()}"); - } - - this.moduleClientLock.Release(); - } - - public void Dispose() - { - this.inner.Dispose(); - this.moduleClientLock.Dispose(); - } - - static async Task InitializeModuleClientAsync(TransportType transportType) - { - LoggerUtil.Writer.LogInformation($"Trying to initialize module client using transport type [{transportType}]"); - - ITransportSettings[] GetTransportSettings() - { - switch (transportType) - { - case TransportType.Mqtt: - case TransportType.Mqtt_Tcp_Only: - return new ITransportSettings[] { new MqttTransportSettings(TransportType.Mqtt_Tcp_Only) }; - case TransportType.Mqtt_WebSocket_Only: - return new ITransportSettings[] { new MqttTransportSettings(TransportType.Mqtt_WebSocket_Only) }; - case TransportType.Amqp_WebSocket_Only: - return new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_WebSocket_Only) }; - default: - return new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_Tcp_Only) }; - } - } - - ITransportSettings[] settings = GetTransportSettings(); - ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings); - moduleClient.ProductInfo = Constants.ProductInfo; - await moduleClient.OpenAsync(); - - LoggerUtil.Writer.LogInformation($"Successfully initialized module client using transport type [{transportType}]"); - - return moduleClient; - } - } -} diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper/AzureMonitorModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper/AzureMonitorModuleClientWrapper.cs new file mode 100644 index 00000000000..f9b65667cd1 --- /dev/null +++ b/edge-modules/metrics-collector/src/ModuleClientWrapper/AzureMonitorModuleClientWrapper.cs @@ -0,0 +1,66 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Edge.Util; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper +{ + public class AzureMonitorClientWrapper : IDisposable, IModuleClientWrapper + { + Option inner; + + public AzureMonitorClientWrapper(Option basicModuleClientWrapper) + { + this.inner = basicModuleClientWrapper; + } + + public static Task BuildModuleClientWrapperAsync() + { + // Don't try to initialize client so this call won't block. + return Task.FromResult(new AzureMonitorClientWrapper(Option.None())); + } + + public async Task RecreateClientAsync() + { + await this.inner.Match(async (BasicModuleClientWrapper) => + { + try + { + await BasicModuleClientWrapper.RecreateClientAsync(); + } + catch (Exception) + { + this.inner = Option.None(); + } + }, async () => + { + try + { + this.inner = Option.Some(await BasicModuleClientWrapper.BuildModuleClientWrapperAsync()); + } + catch (Exception) + { + this.inner = Option.None(); + } + }); + } + + public Task SendMessageAsync(string outputName, Message message) + { + throw new Exception("Not expected to send metrics to IoT Hub when upload target is AzureMonitor"); + } + + public void Dispose() + { + this.inner.ForEach((basicModuleClientWrapper) => + { + basicModuleClientWrapper.Dispose(); + }); + } + } +} + + + diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper/BasicModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper/BasicModuleClientWrapper.cs new file mode 100644 index 00000000000..50c85bc3b13 --- /dev/null +++ b/edge-modules/metrics-collector/src/ModuleClientWrapper/BasicModuleClientWrapper.cs @@ -0,0 +1,87 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Edge.Util; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper +{ + public class BasicModuleClientWrapper : IDisposable, IModuleClientWrapper + { + ModuleClient inner; + SemaphoreSlim moduleClientLock; + + public BasicModuleClientWrapper(ModuleClient moduleClient, SemaphoreSlim moduleClientLock) + { + this.inner = moduleClient; + this.moduleClientLock = Preconditions.CheckNotNull(moduleClientLock); + } + + public static async Task BuildModuleClientWrapperAsync() + { + SemaphoreSlim moduleClientLock = new SemaphoreSlim(1, 1); + ModuleClient moduleClient = await InitializeModuleClientAsync(); + return new BasicModuleClientWrapper(moduleClient, moduleClientLock); + } + + public async Task RecreateClientAsync() + { + + await this.moduleClientLock.WaitAsync(); + + try + { + this.inner.Dispose(); + this.inner = await InitializeModuleClientAsync(); + LoggerUtil.Writer.LogInformation("Closed and re-established connection to IoT Hub"); + } + catch (Exception) + { + this.moduleClientLock.Release(); + throw; + } + + this.moduleClientLock.Release(); + } + + public async Task SendMessageAsync(string outputName, Message message) + { + await this.moduleClientLock.WaitAsync(); + + try + { + await this.inner.SendEventAsync(outputName, message); + LoggerUtil.Writer.LogInformation("Successfully sent metrics via IoT message"); + } + catch (Exception) + { + this.moduleClientLock.Release(); + throw; + } + + this.moduleClientLock.Release(); + } + + public void Dispose() + { + this.inner.Dispose(); + this.moduleClientLock.Dispose(); + } + + static async Task InitializeModuleClientAsync() + { + TransportType transportType = TransportType.Amqp_Tcp_Only; + LoggerUtil.Writer.LogInformation($"Trying to initialize module client using transport type [{transportType}]"); + + ITransportSettings[] settings = new ITransportSettings[] { new AmqpTransportSettings(transportType) }; + ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings); + moduleClient.ProductInfo = Constants.ProductInfo; + + await moduleClient.OpenAsync(); + LoggerUtil.Writer.LogInformation($"Successfully initialized module client using transport type [{transportType}]"); + return moduleClient; + } + } +} + diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper/IotMessageModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper/IotMessageModuleClientWrapper.cs new file mode 100644 index 00000000000..1d832e1850b --- /dev/null +++ b/edge-modules/metrics-collector/src/ModuleClientWrapper/IotMessageModuleClientWrapper.cs @@ -0,0 +1,60 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Edge.Util; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper +{ + public class IotMessageModuleClientWrapper : IDisposable, IModuleClientWrapper + { + BasicModuleClientWrapper inner; + CancellationTokenSource cancellationTokenSource; + + public IotMessageModuleClientWrapper(BasicModuleClientWrapper basicModuleClientWrapper, CancellationTokenSource cts) + { + this.inner = basicModuleClientWrapper; + this.cancellationTokenSource = cts; + } + + public static async Task BuildModuleClientWrapperAsync(CancellationTokenSource cts) + { + BasicModuleClientWrapper basicModuleClientWrapper = await BasicModuleClientWrapper.BuildModuleClientWrapperAsync(); + return new IotMessageModuleClientWrapper(basicModuleClientWrapper, cts); + } + + public async Task RecreateClientAsync() + { + try + { + await this.inner.RecreateClientAsync(); + } + catch (Exception e) + { + LoggerUtil.Writer.LogError($"Failed closing and re-establishing connection to IoT Hub: {e.ToString()}"); + this.cancellationTokenSource.Cancel(); + } + } + + public async Task SendMessageAsync(string outputName, Message message) + { + try + { + await this.inner.SendMessageAsync(outputName, message); + } + catch (Exception e) + { + LoggerUtil.Writer.LogError($"Failed sending metrics as IoT message: {e.ToString()}"); + } + } + + public void Dispose() + { + this.inner.Dispose(); + } + } +} + + + diff --git a/edge-modules/metrics-collector/src/ModuleClientWrapper/ModuleClientWrapper.cs b/edge-modules/metrics-collector/src/ModuleClientWrapper/ModuleClientWrapper.cs new file mode 100644 index 00000000000..168c6bc4baa --- /dev/null +++ b/edge-modules/metrics-collector/src/ModuleClientWrapper/ModuleClientWrapper.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Edge.Util; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.ModuleClientWrapper +{ + public interface IModuleClientWrapper + { + Task RecreateClientAsync(); + Task SendMessageAsync(string outputName, Message message); + + } +} diff --git a/edge-modules/metrics-collector/src/Program.cs b/edge-modules/metrics-collector/src/Program.cs index 76c259d5569..3b7dfc8760d 100644 --- a/edge-modules/metrics-collector/src/Program.cs +++ b/edge-modules/metrics-collector/src/Program.cs @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Devices.Edge.Azure.Monitor using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Microsoft.Azure.Devices.Edge.Util; + using ModuleClientWrapper; using System.Diagnostics; @@ -45,27 +46,11 @@ static async Task MainAsync() #endif LoggerUtil.Writer.LogInformation($"Metrics collector initialized with the following settings:\r\n{Settings.Information}"); - ModuleClientWrapper moduleClientWrapper = null; + IModuleClientWrapper moduleClientWrapper = null; try { - try - { - moduleClientWrapper = await ModuleClientWrapper.BuildModuleClientWrapperAsync(Settings.Current.TransportType); - PeriodicTask periodicIothubConnect = new PeriodicTask(moduleClientWrapper.RecreateClientAsync, Settings.Current.IotHubConnectFrequency, TimeSpan.FromMinutes(1), LoggerUtil.Writer, "Reconnect to IoT Hub", true); - } - catch (Exception e) - { - String msg = String.Format("Error connecting to Edge Hub. Is Edge Hub up and running with settings for clients connecting over {0}? Exception: {1}", Settings.Current.TransportType, e); - if (Settings.Current.UploadTarget == UploadTarget.IotMessage) - { - LoggerUtil.Writer.LogError(msg); - throw; - } - else - { - LoggerUtil.Writer.LogWarning(msg); - } - } + moduleClientWrapper = await BuildModuleClientWrapperAsync(Settings.Current.UploadTarget, cts); + PeriodicTask periodicIothubConnect = new PeriodicTask(moduleClientWrapper.RecreateClientAsync, Settings.Current.IotHubConnectFrequency, TimeSpan.FromMinutes(1), LoggerUtil.Writer, "Reconnect to IoT Hub", true); MetricsScraper scraper = new MetricsScraper(Settings.Current.Endpoints); IMetricsPublisher publisher; @@ -92,7 +77,7 @@ static async Task MainAsync() } finally { - moduleClientWrapper?.Dispose(); + ((IDisposable)moduleClientWrapper)?.Dispose(); } completed.Set(); @@ -101,5 +86,17 @@ static async Task MainAsync() LoggerUtil.Writer.LogInformation("MetricsCollector Main() finished."); return 0; } + + static async Task BuildModuleClientWrapperAsync(UploadTarget uploadTarget, CancellationTokenSource cts) + { + if (uploadTarget == UploadTarget.AzureMonitor) + { + return await AzureMonitorClientWrapper.BuildModuleClientWrapperAsync(); + } + else + { + return await IotMessageModuleClientWrapper.BuildModuleClientWrapperAsync(cts); + } + } } } diff --git a/edge-modules/metrics-collector/src/README.md b/edge-modules/metrics-collector/src/README.md index 411388c31dd..016a4257195 100644 --- a/edge-modules/metrics-collector/src/README.md +++ b/edge-modules/metrics-collector/src/README.md @@ -69,9 +69,6 @@ Optional config items: - Configurable azure domain which is used to construct the log analytics upload address. - ex: `azure.com.cn` - Defaults to `azure.com` -- `TransportType` - - The transport used to connect to IoT Hub when `UploadTarget` is `IotMessage`. Valid values are `Amqp_Tcp_Only`, `Amqp_WebSocket_Only`, `Mqtt_Tcp_Only`, and `Mqtt_WebSocket_Only`. `Amqp` and `Mqtt` are also valid, and are aliases for `Amqp_Tcp_Only` and `Mqtt_Tcp_Only`, respectively. Default is `Amqp_Tcp_Only`. - ## Upload Target: diff --git a/edge-modules/metrics-collector/src/Settings.cs b/edge-modules/metrics-collector/src/Settings.cs index 073fee6657d..83598256efa 100644 --- a/edge-modules/metrics-collector/src/Settings.cs +++ b/edge-modules/metrics-collector/src/Settings.cs @@ -12,7 +12,6 @@ namespace Microsoft.Azure.Devices.Edge.Azure.Monitor using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Microsoft.Azure.Devices.Edge.Util; - using Microsoft.Azure.Devices.Client; internal class Settings { @@ -33,8 +32,7 @@ private Settings( string resourceId, string version, TimeSpan iotHubConnectFrequency, - string azureDomain, - TransportType transportType) + string azureDomain) { this.UploadTarget = uploadTarget; this.ResourceId = Preconditions.CheckNonWhiteSpace(resourceId, nameof(resourceId)); @@ -72,8 +70,6 @@ private Settings( this.Version = version; this.IotHubConnectFrequency = iotHubConnectFrequency; this.AzureDomain = azureDomain; - - this.TransportType = transportType; } private static Settings Create() @@ -100,8 +96,7 @@ private static Settings Create() configuration.GetValue("ResourceID", ""), configuration.GetValue("version", ""), configuration.GetValue("IotHubConnectFrequency", TimeSpan.FromDays(1)), - configuration.GetValue("AzureDomain", "azure.com"), - configuration.GetValue("transportType", TransportType.Amqp_Tcp_Only) + configuration.GetValue("AzureDomain", "azure.com") ); } catch (ArgumentException e) @@ -133,8 +128,7 @@ private static Settings GetInformation() regex.Replace(settings.ResourceId, "$1" + "XXX" + "$3" + "XXX" + "$5"), settings.Version, settings.IotHubConnectFrequency, - settings.AzureDomain, - settings.TransportType); + settings.AzureDomain); } public string LogAnalyticsWorkspaceId { get; } @@ -165,8 +159,6 @@ private static Settings GetInformation() public string AzureDomain { get; } - public TransportType TransportType { get; } - // Used to eliminate secrets when logging the settings public override string ToString() { @@ -184,7 +176,6 @@ public override string ToString() { nameof(this.ResourceId), this.ResourceId ?? string.Empty }, { nameof(this.IotHubConnectFrequency), this.IotHubConnectFrequency.ToString() ?? string.Empty }, { nameof(this.AzureDomain), this.AzureDomain ?? string.Empty }, - { nameof(this.TransportType), this.TransportType.ToString() } }; return $"Settings:{Environment.NewLine}{string.Join(Environment.NewLine, fields.Select(f => $"{f.Key}={f.Value}"))}";