diff --git a/Cognite.Simulator.Extensions/Cognite.Simulator.Extensions.csproj b/Cognite.Simulator.Extensions/Cognite.Simulator.Extensions.csproj index 7d78c1a5..410d3686 100644 --- a/Cognite.Simulator.Extensions/Cognite.Simulator.Extensions.csproj +++ b/Cognite.Simulator.Extensions/Cognite.Simulator.Extensions.csproj @@ -21,7 +21,8 @@ - + + diff --git a/Cognite.Simulator.Extensions/DataModel.cs b/Cognite.Simulator.Extensions/DataModel.cs index c8f364fe..ceffa4f3 100644 --- a/Cognite.Simulator.Extensions/DataModel.cs +++ b/Cognite.Simulator.Extensions/DataModel.cs @@ -169,11 +169,6 @@ public static class SimulatorIntegrationMetadata /// Name of the connector handling the integration with a simulator /// public const string ConnectorNameKey = "connector"; - - /// - /// Data type of simulation integrations - /// - public const SimulatorDataType DataType = SimulatorDataType.SimulatorIntegration; } /// @@ -316,58 +311,6 @@ public static class KeyValuePairSequenceColumns public const string ValueName = "Value"; } - /// - /// Row keys that should be present in simulator integration sequences - /// - public static class SimulatorIntegrationSequenceRows - { - /// - /// Heartbeat key (Last time seen) - /// - public const string Heartbeat = "heartbeat"; - - /// - /// Data set id key. Data set containg the data used and generated by the simulator integration - /// - public const string DataSetId = "dataSetId"; - - /// - /// Connector version key. Version of the connector associated with this simulator integration - /// - public const string ConnectorVersion = "connectorVersion"; - - /// - /// Api enabled key. Indicates if the simulator api is enabled for simulation runs or if it's using CDF Events - /// - public const string SimulatorsApiEnabled = "apiEnabled"; - - /// - /// Simulator version key. Installed version of the simulator - /// - public const string SimulatorVersion = "simulatorVersion"; - - /// - /// License key (Last time seen) - /// - public const string LicenseTimestamp = "licenseLastCheckedTime"; - - /// - /// License key (Last result) - /// - public const string LicenseStatus = "licenseStatus"; - - /// - /// Status of the connector (indicates whether a calculation is running or not) - /// - public const string ConnectorStatus = "connectorStatus"; - - /// - /// Timestamp of when the connector went into the new connectorStatus state - /// - public const string ConnectorStatusTimestamp = "connectorStatusUpdatedTime"; - - } - /// /// Types of simulator resources that can be stored in CDF /// diff --git a/Cognite.Simulator.Extensions/SequencesExtensions.cs b/Cognite.Simulator.Extensions/SequencesExtensions.cs index 5bb65848..ef22465e 100644 --- a/Cognite.Simulator.Extensions/SequencesExtensions.cs +++ b/Cognite.Simulator.Extensions/SequencesExtensions.cs @@ -64,157 +64,6 @@ public static async Task FindModelBoundaryConditions( }, token).ConfigureAwait(false); } - /// - /// For each simulator in , retrieve or create - /// a simulator integration sequence in CDF - /// - /// CDF sequences resource - /// List of simulation integration details - /// Cancellation token - /// Retrieved or created sequences - /// Thrown when one or more sequences - /// could not be created. The exception contains the list of errors - public static async Task> GetOrCreateSimulatorIntegrations( - this SequencesResource sequences, - IEnumerable integrations, - CancellationToken token) - { - var result = new List(); - if (integrations == null || !integrations.Any()) - { - return result; - } - - var toCreate = new Dictionary(); - foreach (var integration in integrations) - { - var metadata = new Dictionary() - { - { BaseMetadata.DataTypeKey, SimulatorIntegrationMetadata.DataType.MetadataValue() }, - { BaseMetadata.SimulatorKey, integration.Simulator }, - { SimulatorIntegrationMetadata.ConnectorNameKey, integration.ConnectorName } - }; - var query = new SequenceQuery - { - Filter = new SequenceFilter - { - Metadata = metadata - } - }; - var seqs = await sequences.ListAsync(query, token).ConfigureAwait(false); - if (seqs.Items.Any()) - { - result.Add(seqs.Items.First()); - } - else - { - metadata.Add(BaseMetadata.DataModelVersionKey, BaseMetadata.DataModelVersionValue); - var createObj = new SequenceCreate - { - Name = $"{integration.Simulator} Simulator Integration", - ExternalId = $"{integration.Simulator}-INTEGRATION-{DateTime.UtcNow.ToUnixTimeMilliseconds()}", - Description = $"Details about {integration.Simulator} integration", - Metadata = metadata, - Columns = GetKeyValueColumnWrite() - }; - if (integration.DataSetId.HasValue) - { - createObj.DataSetId = integration.DataSetId.Value; - } - toCreate.Add(createObj.ExternalId, createObj); - } - } - if (toCreate.Any()) - { - var createdSequences = await sequences.GetOrCreateAsync( - toCreate.Keys, - (ids) => toCreate - .Where(o => ids.Contains(o.Key)) - .Select(o => o.Value).ToList(), - chunkSize: 10, - throttleSize: 1, - RetryMode.None, - SanitationMode.None, - token).ConfigureAwait(false); - - if (!createdSequences.IsAllGood) - { - throw new SimulatorIntegrationSequenceException("Could not find or create simulator integration sequence", createdSequences.Errors); - } - if (createdSequences.Results != null) - { - result.AddRange(createdSequences.Results); - } - } - return result; - } - - /// - /// Update the simulator integration sequence with the connector heartbeat (last time seen) - /// - /// CDF Sequences resource - /// If true, the data set id, connector version and extra information rows are also - /// updated. Else, only the heartbeat row is updated - /// Simulator integration sequence external id - /// Data to be updated, if init is set to true - /// Cancellation token - /// Timestamp of the last license check - /// Result of the last license check - /// Thrown when one or more sequences - /// rows could not be updated. The exception contains the list of errors - public static async Task UpdateSimulatorIntegrationsData( - this SequencesResource sequences, - string sequenceExternalId, - bool init, - SimulatorIntegrationUpdate update, - CancellationToken token, - long lastLicenseCheckTimestamp, - string lastLicenseCheckResult) - { - var rowsToCreate = new List(); - var rowData = new Dictionary(); - - rowData.Add(SimulatorIntegrationSequenceRows.Heartbeat, $"{DateTime.UtcNow.ToUnixTimeMilliseconds()}"); - rowData.Add(SimulatorIntegrationSequenceRows.LicenseTimestamp, $"{lastLicenseCheckTimestamp}"); - rowData.Add(SimulatorIntegrationSequenceRows.LicenseStatus, lastLicenseCheckResult); - - if (init) - { - if (update == null) - { - throw new ArgumentNullException(nameof(update)); - } - // Data set and version could only have changed on connector restart - if (update.DataSetId.HasValue) - { - rowData.Add(SimulatorIntegrationSequenceRows.DataSetId, $"{update.DataSetId.Value}"); - } - rowData.Add(SimulatorIntegrationSequenceRows.ConnectorVersion, $"{update.ConnectorVersion}"); - rowData.Add(SimulatorIntegrationSequenceRows.SimulatorVersion, $"{update.SimulatorVersion}"); - rowData.Add(SimulatorIntegrationSequenceRows.SimulatorsApiEnabled, $"{update.SimulatorApiEnabled}"); - if (update.ExtraInformation != null && update.ExtraInformation.Any()) - { - update.ExtraInformation.ToList().ForEach(i => rowData.Add(i.Key, i.Value)); - } - } - - var rowCreate = ToSequenceData(rowData, sequenceExternalId, 0); - rowsToCreate.Add(rowCreate); - var result = await sequences.InsertAsync( - rowsToCreate, - keyChunkSize: 10, - valueChunkSize: 100, - sequencesChunk: 10, - throttleSize: 1, - RetryMode.None, - SanitationMode.None, - token).ConfigureAwait(false); - if (!result.IsAllGood) - { - throw new SimulatorIntegrationSequenceException("Could not update simulator integration sequence", result.Errors); - } - } - /// /// Store tabular simulation results as sequences /// @@ -543,128 +392,6 @@ private static SequenceCreate GetSequenceCreatePrototype( } return seqCreate; } - - /// - /// Read the values of a and returns - /// it as a string array - /// - /// CDF Sequence row - /// Array containing the row values - public static string[] GetStringValues(this SequenceRow row) - { - var result = new List(); - foreach (var val in row.Values) - { - if (val != null && val.Type == MultiValueType.STRING) - { - result.Add(((MultiValue.String)val).Value); - } - else - { - result.Add(null); - } - } - return result.ToArray(); - } - - /// - /// Simply upserts a key value pair into a key value pair based sequence - /// - /// CDF Sequence resource - /// External id of the sequence. - /// Key in sequence to update - /// Value to update the Key value pair to - /// The cancellation token - /// Nothing - public static async - Task UpsertItemInKVPSequence(this SequencesResource sequences, string sequenceExternalId, string updateKey, string updateValue, CancellationToken token) { - if (sequenceExternalId == "") { - throw new Exception("External Id is required to upsert a sequence"); - } - SequenceRowQuery query = new SequenceRowQuery(); - query.ExternalId = sequenceExternalId; - var sequenceList = await sequences.ListRowsAsync(query, token).ConfigureAwait(false);; - var rows = sequenceList.Rows; - var columns = sequenceList.Columns; - var newRows = rows.Select(r => new SequenceRow - { - RowNumber = r.RowNumber, - Values = r.Values - }).ToList(); - // Checking to see if the key alread exists - var foundIndex = newRows.FindIndex(0, newRows.Count, (item => item.Values.FirstOrDefault().ToString().Contains(updateKey) )); - if (foundIndex == -1){ - newRows.Insert(newRows.Count , new SequenceRow{ - RowNumber = newRows.Count, - Values = new List { - MultiValue.Create(updateKey), - MultiValue.Create(updateValue) - } - }); - } else { - newRows[foundIndex].Values = new List { - MultiValue.Create(updateKey), - MultiValue.Create(updateValue) - }; - } - - SequenceDataCreate sequenceData = new SequenceDataCreate - { - ExternalId = sequenceExternalId, - Columns = new List { - KeyValuePairSequenceColumns.Key, - KeyValuePairSequenceColumns.Value - }, - Rows = newRows - }; - var rowsToCreate = new List(); - rowsToCreate.Add(sequenceData); - var result = await sequences.InsertAsync( - rowsToCreate, - keyChunkSize: 10, - valueChunkSize: 100, - sequencesChunk: 10, - throttleSize: 1, - RetryMode.None, - SanitationMode.None, - token - ).ConfigureAwait(false); - - if (!result.IsAllGood) - { - throw new SimulatorIntegrationSequenceException("Could not update simulator integration sequence", result.Errors); - } - } - - /// - /// Gets the externalId of a SimulatorIntegrations sequence (This function can throw an exception) - /// - /// CDF Sequence resource - /// The simulator name. - /// The simulator datasetId - /// The connector Name - /// The cancellation token - /// externalId string - public static async Task GetSequenceExternalId(this SequencesResource sequences, string simulatorName, long simulatorDataSetId, string connectorName, CancellationToken token) { - var simulatorSequenceIds = new Dictionary(); - var simulatorsDict = new SimulatorIntegration - { - Simulator = simulatorName, - DataSetId = simulatorDataSetId, - ConnectorName = connectorName - }; - - var integrations = await sequences.GetOrCreateSimulatorIntegrations( - new List { simulatorsDict }, - token).ConfigureAwait(false); - foreach (var integration in integrations) - { - simulatorSequenceIds.Add( - integration.Metadata[BaseMetadata.SimulatorKey], - integration.ExternalId); - } - return simulatorSequenceIds[simulatorName]; - } } /// diff --git a/Cognite.Simulator.Extensions/Types.cs b/Cognite.Simulator.Extensions/Types.cs index 9fef93be..2dc6d32d 100644 --- a/Cognite.Simulator.Extensions/Types.cs +++ b/Cognite.Simulator.Extensions/Types.cs @@ -374,58 +374,4 @@ public void OverwriteTimeSeriesId(string externalId) ExternalIdOverwrite = externalId; } } - - /// - /// Represents simulator integration information in CDF. Each connector can register one or more - /// simulator integration record with CDF - /// - public class SimulatorIntegration - { - /// - /// Name of the simulator - /// - public string Simulator { get; set; } - - /// - /// Name of the connector - /// - public string ConnectorName { get; set; } - - /// - /// ID of the data set that holds simulator data in CDF - /// - public long? DataSetId { get; set; } - } - - /// - /// Represents data used to update the simulator integration information in CDF - /// - public class SimulatorIntegrationUpdate : SimulatorIntegration - { - /// - /// Version of the connector - /// - public string ConnectorVersion { get; set; } - - /// - /// Version of the simulator - /// - public string SimulatorVersion { get; set; } - - /// - /// Flag indicating if the connector is using Cognite's Simulator Integration API - /// or just core CDF resources. - /// - public bool SimulatorApiEnabled { get; set; } - - /// - /// Flag indicating if there connector will check for simulator liceses - /// - public bool LicenseCheckEnabled { get; set; } - - /// - /// Any extra information that can be registered by the connector - /// - public Dictionary ExtraInformation { get; set; } = new Dictionary(); - } } diff --git a/Cognite.Simulator.Tests/ExtensionsTests/SequencesTest.cs b/Cognite.Simulator.Tests/ExtensionsTests/SequencesTest.cs index 15bbfa5f..12cb1153 100644 --- a/Cognite.Simulator.Tests/ExtensionsTests/SequencesTest.cs +++ b/Cognite.Simulator.Tests/ExtensionsTests/SequencesTest.cs @@ -41,234 +41,6 @@ public async Task TestFindModelBoundaryConditions() Assert.Equal(4, rows.Rows.First().Values.Count()); } - [Fact] - public async Task TestGetOrCreateSimulatorIntegration() - { - const string connectorName = "integration-tests-connector"; - const long dataSetId = CdfTestClient.TestDataset; - var services = new ServiceCollection(); - services.AddCogniteTestClient(); - - using var provider = services.BuildServiceProvider(); - var cdf = provider.GetRequiredService(); - var sequences = cdf.Sequences; - var simulators = new List - { - new SimulatorIntegration - { - Simulator = "PROSPER", // Assumes this one exists in CDF - DataSetId = dataSetId, - ConnectorName = connectorName, - }, - new SimulatorIntegration - { - Simulator = "SomeSimulator", // This one should be created - DataSetId = dataSetId, - ConnectorName = connectorName, - } - }; - - string? externalIdToDelete = null; - try - { - var integrations = await sequences.GetOrCreateSimulatorIntegrations( - simulators, - CancellationToken.None).ConfigureAwait(false); - - Assert.NotEmpty(integrations); - foreach (var sim in simulators) - { - var seq = Assert.Single(integrations, i => - i.DataSetId == sim.DataSetId && - i.Metadata[BaseMetadata.DataTypeKey] == SimulatorIntegrationMetadata.DataType.MetadataValue() && - i.Metadata[BaseMetadata.SimulatorKey] == sim.Simulator && - i.Metadata[SimulatorIntegrationMetadata.ConnectorNameKey] == sim.ConnectorName); - - Assert.Equal(2, seq.Columns.Count()); - if (sim.Simulator == "SomeSimulator") - { - externalIdToDelete = seq.ExternalId; - } - } - } - finally - { - // Cleanup created resources - if (externalIdToDelete != null) - { - await sequences.DeleteAsync(new List { externalIdToDelete }, CancellationToken.None).ConfigureAwait(false); - } - } - } - - [Fact] - public async Task TestUpsertItemInKVPSequence() - { - const string connectorName = "integration-tests-connector"; - const long dataSetId = CdfTestClient.TestDataset; - var services = new ServiceCollection(); - services.AddCogniteTestClient(); - - using var provider = services.BuildServiceProvider(); - var cdf = provider.GetRequiredService(); - var sequences = cdf.Sequences; - var simulators = new List - { - new SimulatorIntegration - { - Simulator = "TestSimulatorStatus", - DataSetId = dataSetId, - ConnectorName = connectorName, - } - }; - - string? externalId = null; - - try - { - // Create a test simulator integration sequence - var integrations = await sequences.GetOrCreateSimulatorIntegrations( - simulators, - CancellationToken.None).ConfigureAwait(false); - Assert.NotEmpty(integrations); - externalId = integrations.First().ExternalId; - var calculationStatus = "Calculation Running"; - await sequences.UpsertItemInKVPSequence( - externalId, - SimulatorIntegrationSequenceRows.ConnectorStatus, - calculationStatus, - CancellationToken.None).ConfigureAwait(false); - - // Verify that the sequence was updated correctly - var result = await sequences.ListRowsAsync(new SequenceRowQuery - { - ExternalId = externalId - }, CancellationToken.None).ConfigureAwait(false); - - Assert.NotEmpty(result.Columns); - Assert.Contains(result.Columns, c => c.ExternalId == KeyValuePairSequenceColumns.Key); - Assert.Contains(result.Columns, c => c.ExternalId == KeyValuePairSequenceColumns.Value); - - foreach(var row in result.Rows) - { - var values = row.GetStringValues(); - switch (values[0]) { - case SimulatorIntegrationSequenceRows.ConnectorStatus: - Assert.Equal(calculationStatus, values[1]); - break; - } - } - } - finally - { - // Cleanup created resources - if (externalId != null) - { - await sequences.DeleteAsync(new List { externalId }, CancellationToken.None).ConfigureAwait(false); - } - } - - } - - - [Fact] - public async Task TestUpdateSimulatorIntegrationsData() - { - const string connectorName = "integration-tests-connector"; - const long dataSetId = CdfTestClient.TestDataset; - var services = new ServiceCollection(); - services.AddCogniteTestClient(); - using var provider = services.BuildServiceProvider(); - var cdf = provider.GetRequiredService(); - var sequences = cdf.Sequences; - var simulators = new List - { - new SimulatorIntegration - { - Simulator = "TestHeartbeatSimulator", - DataSetId = dataSetId, - ConnectorName = connectorName, - } - }; - - string? externalIdToDelete = null; - try - { - // Create a test simulator integration sequence - var integrations = await sequences.GetOrCreateSimulatorIntegrations( - simulators, - CancellationToken.None).ConfigureAwait(false); - Assert.NotEmpty(integrations); - externalIdToDelete = integrations.First().ExternalId; - - var now = DateTime.UtcNow.ToUnixTimeMilliseconds(); - - // Update the sequence with connector heartbeat and license timestamp - await sequences.UpdateSimulatorIntegrationsData( - externalIdToDelete, - true, - new SimulatorIntegrationUpdate - { - Simulator = "TestHeartbeatSimulator", - DataSetId = dataSetId, - ConnectorName = connectorName, - ConnectorVersion = "1.0.0", - SimulatorVersion = "1.2.3", - }, - CancellationToken.None, - lastLicenseCheckTimestamp: now, - lastLicenseCheckResult: "Available").ConfigureAwait(false); - - // Verify that the sequence was updated correctly - var result = await sequences.ListRowsAsync(new SequenceRowQuery - { - ExternalId = externalIdToDelete - }, CancellationToken.None).ConfigureAwait(false); - Assert.NotEmpty(result.Columns); - Assert.Contains(result.Columns, c => c.ExternalId == KeyValuePairSequenceColumns.Key); - Assert.Contains(result.Columns, c => c.ExternalId == KeyValuePairSequenceColumns.Value); - - foreach(var row in result.Rows) - { - var values = row.GetStringValues(); - switch (values[0]) { - case SimulatorIntegrationSequenceRows.Heartbeat: - Assert.True(long.TryParse(values[1], out long heartbeat) && heartbeat >= now); - break; - case SimulatorIntegrationSequenceRows.LicenseTimestamp: - Assert.True(long.TryParse(values[1], out long licenseTimestamp) && licenseTimestamp >= now); - break; - case SimulatorIntegrationSequenceRows.LicenseStatus: - Assert.Equal("Available", values[1]); - break; - case SimulatorIntegrationSequenceRows.DataSetId: - Assert.Equal(dataSetId.ToString(), values[1]); - break; - case SimulatorIntegrationSequenceRows.ConnectorVersion: - Assert.Equal("1.0.0", values[1]); - break; - case SimulatorIntegrationSequenceRows.SimulatorVersion: - Assert.Equal("1.2.3", values[1]); - break; - case SimulatorIntegrationSequenceRows.SimulatorsApiEnabled: - Assert.False(Boolean.Parse(values[1])); - break; - default: - Assert.True(false); - break; - } - } - } - finally - { - // Cleanup created resources - if (externalIdToDelete != null) - { - await sequences.DeleteAsync(new List { externalIdToDelete }, CancellationToken.None).ConfigureAwait(false); - } - } - } - [Fact] public async Task TestStoreSimulationResults() { diff --git a/Cognite.Simulator.Tests/TestHelpers.cs b/Cognite.Simulator.Tests/TestHelpers.cs new file mode 100644 index 00000000..d2ec7801 --- /dev/null +++ b/Cognite.Simulator.Tests/TestHelpers.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Cognite.Extractor.Common; +using CogniteSdk; +using CogniteSdk.Alpha; + +namespace Cognite.Simulator.Tests { + public class TestHelpers { + + public static async Task SimulateProsperRunningAsync(Client cdf, string connectorName = "scheduler-test-connector" ) { + var integrations = await cdf.Alpha.Simulators.ListSimulatorIntegrationsAsync( + new SimulatorIntegrationQuery + { + } + ).ConfigureAwait(false); + var existing = integrations.Items.FirstOrDefault(i => i.ExternalId == connectorName && i.SimulatorExternalId == "PROSPER"); + if (existing == null) { + await cdf.Alpha.Simulators.CreateSimulatorIntegrationAsync( + new List + { + new SimulatorIntegrationCreate + { + ExternalId = connectorName, + SimulatorExternalId = "PROSPER", + DataSetId = CdfTestClient.TestDataset, + Heartbeat = DateTime.UtcNow.ToUnixTimeMilliseconds(), + ConnectorVersion = "N/A", + SimulatorVersion = "N/A", + RunApiEnabled = true, + } + } + ).ConfigureAwait(false); + } else { + await cdf.Alpha.Simulators.UpdateSimulatorIntegrationAsync( + new List + { + new SimulatorIntegrationUpdateItem(existing.Id) + { + Update = new SimulatorIntegrationUpdate + { + Heartbeat = new Update(DateTime.UtcNow.ToUnixTimeMilliseconds()) + } + } + } + ).ConfigureAwait(false); + } + } + } +} diff --git a/Cognite.Simulator.Tests/UtilsTests/ConnectorBaseTest.cs b/Cognite.Simulator.Tests/UtilsTests/ConnectorBaseTest.cs index d4528fd3..3fd02e2c 100644 --- a/Cognite.Simulator.Tests/UtilsTests/ConnectorBaseTest.cs +++ b/Cognite.Simulator.Tests/UtilsTests/ConnectorBaseTest.cs @@ -4,6 +4,7 @@ using Cognite.Simulator.Extensions; using Cognite.Simulator.Utils; using CogniteSdk; +using CogniteSdk.Alpha; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; @@ -19,9 +20,15 @@ namespace Cognite.Simulator.Tests.UtilsTests [Collection(nameof(SequentialTestCollection))] public class ConnectorBaseTest { + /// + /// Test that the connector can report status back to CDF + /// It also checks whether extraction pipeline is created + /// [Fact] public async Task TestConnectorBase() { + var timestamp = DateTime.UtcNow.ToUnixTimeMilliseconds(); + var simulatorName = $"TestSim {timestamp}"; var services = new ServiceCollection(); services.AddCogniteTestClient(); services.AddLogger(); @@ -31,7 +38,7 @@ public async Task TestConnectorBase() services.AddSingleton(); var simConfig = new SimulatorConfig { - Name = "TestSim", + Name = simulatorName, DataSetId = CdfTestClient.TestDataset }; services.AddSingleton(simConfig); @@ -44,38 +51,38 @@ public async Task TestConnectorBase() var cdf = provider.GetRequiredService(); var cdfConfig = provider.GetRequiredService(); - string? externalIdToDelete = null; + // prepopulate the TestSim simulator + await cdf.Alpha.Simulators.CreateAsync( + new [] + { + new SimulatorCreate() + { + ExternalId = simulatorName, + Name = "TestSim", + FileExtensionTypes = new List { "test" }, + Enabled = true, + } + } + ).ConfigureAwait(false); + try { - var timestamp = DateTime.UtcNow.ToUnixTimeMilliseconds(); await connector .Init(source.Token) .ConfigureAwait(false); - externalIdToDelete = connector.GetSimulatorIntegartionExternalId("TestSim"); - Assert.NotNull(externalIdToDelete); - - var rowQuery = new SequenceRowQuery - { - ExternalId = externalIdToDelete, - }; - - var rowsResult = await cdf.Sequences.ListRowsAsync( - rowQuery, + var integrationsRes = await cdf.Alpha.Simulators.ListSimulatorIntegrationsAsync( + new SimulatorIntegrationQuery(), source.Token).ConfigureAwait(false); - Assert.NotNull(rowsResult); - Assert.NotEmpty(rowsResult.Rows); - - IDictionary resultDict = rowsResult.Rows.ToDictionary( - r => r.GetStringValues()[0], r => r.GetStringValues()[1]); - var heartbeat = Assert.Contains(SimulatorIntegrationSequenceRows.Heartbeat, resultDict); - var connVersion = Assert.Contains(SimulatorIntegrationSequenceRows.ConnectorVersion, resultDict); - var simDataset = Assert.Contains(SimulatorIntegrationSequenceRows.DataSetId, resultDict); - var simVersion = Assert.Contains(SimulatorIntegrationSequenceRows.SimulatorVersion, resultDict); - Assert.True(long.Parse(heartbeat) > timestamp); - Assert.Equal(connector.GetConnectorVersion(), connVersion); - Assert.Equal(CdfTestClient.TestDataset, long.Parse(simDataset)); - Assert.Equal("1.2.3", simVersion); + var integration = integrationsRes.Items.FirstOrDefault(i => i.SimulatorExternalId == simulatorName); + + Assert.NotNull(integration); + Assert.Equal(simulatorName, integration.SimulatorExternalId); + Assert.Equal("1.2.3", integration.SimulatorVersion); + Assert.Equal(CdfTestClient.TestDataset, integration.DataSetId); + Assert.Equal("v0.0.1", integration.ConnectorVersion); + Assert.StartsWith($"Test Connector", integration.ExternalId); + Assert.True(integration.Heartbeat >= timestamp); // Start the connector loop and cancel it after 5 seconds. Should be enough time // to report a heartbeat back to CDF at least once. @@ -84,15 +91,6 @@ await connector linkedTokenSource.CancelAfter(TimeSpan.FromSeconds(5)); await connector.Run(linkedToken).ConfigureAwait(false); - rowsResult = await cdf.Sequences.ListRowsAsync( - rowQuery, - source.Token).ConfigureAwait(false); - - resultDict = rowsResult.Rows.ToDictionary( - r => r.GetStringValues()[0], r => r.GetStringValues()[1]); - var lastHeartbeat = Assert.Contains(SimulatorIntegrationSequenceRows.Heartbeat, resultDict); - Assert.True(long.Parse(lastHeartbeat) > long.Parse(heartbeat)); - var pipelines = await cdf.ExtPipes.RetrieveAsync( new List { cdfConfig.ExtractionPipeline.PipelineId }, true, @@ -102,14 +100,11 @@ await connector } finally { - if (externalIdToDelete != null) - { - await cdf.Sequences - .DeleteAsync(new List { externalIdToDelete }, CancellationToken.None) - .ConfigureAwait(false); - } + await cdf.Alpha.Simulators.DeleteAsync( + new [] { new Identity(simulatorName) }, + source.Token).ConfigureAwait(false); await cdf.ExtPipes - .DeleteAsync(new []{ cdfConfig.ExtractionPipeline.PipelineId }, CancellationToken.None).ConfigureAwait(false); + .DeleteAsync(new []{ cdfConfig.ExtractionPipeline?.PipelineId }, CancellationToken.None).ConfigureAwait(false); } } } @@ -136,7 +131,7 @@ public TestConnector( cdf, new ConnectorConfig { - NamePrefix = "Test Connector", + NamePrefix = $"Test Connector {DateTime.UtcNow.ToUnixTimeMilliseconds()}", AddMachineNameSuffix = false }, new List @@ -168,7 +163,7 @@ public override string GetSimulatorVersion(string simulator) public override async Task Init(CancellationToken token) { - await EnsureSimulatorIntegrationsSequencesExists(token).ConfigureAwait(false); + await EnsureSimulatorIntegrationsExists(token).ConfigureAwait(false); await UpdateIntegrationRows( true, token).ConfigureAwait(false); diff --git a/Cognite.Simulator.Tests/UtilsTests/SimulationRunnerTest.cs b/Cognite.Simulator.Tests/UtilsTests/SimulationRunnerTest.cs index 2413c311..b1f1b515 100644 --- a/Cognite.Simulator.Tests/UtilsTests/SimulationRunnerTest.cs +++ b/Cognite.Simulator.Tests/UtilsTests/SimulationRunnerTest.cs @@ -1,4 +1,5 @@ -using Cognite.Extractor.StateStorage; +using Cognite.Extractor.Common; +using Cognite.Extractor.StateStorage; using Cognite.Extractor.Utils; using Cognite.Simulator.Extensions; using Cognite.Simulator.Utils; @@ -102,7 +103,7 @@ public async Task TestSimulationRunnerBase(String calculationName, String calcTy inTsIds.AddRange(inConstTsIds); } - await SimulateProsperRunningAsync(cdf, "integration-tests-connector").ConfigureAwait(true); + await TestHelpers.SimulateProsperRunningAsync(cdf, "integration-tests-connector").ConfigureAwait(true); var runs = await cdf.Alpha.Simulators.CreateSimulationRunsAsync( new List @@ -246,7 +247,7 @@ public async Task TestSimulationRunnerBase(String calculationName, String calcTy Assert.True(dictResult.ContainsKey("samplingStart")); Assert.True(dictResult.ContainsKey("validationEndOffset")); - SamplingRange range = new TimeRange() + SamplingRange range = new CogniteSdk.TimeRange() { Min = long.Parse(dictResult["samplingStart"]), Max = long.Parse(dictResult["samplingEnd"]) @@ -291,35 +292,7 @@ await cdf.TimeSeries.DeleteAsync(new TimeSeriesDelete } - public static async Task SimulateProsperRunningAsync( Client cdf, string connectorName = "scheduler-test-connector" ) { - - var simint = new SimulatorIntegration () { - Simulator = "PROSPER", - DataSetId = CdfTestClient.TestDataset, - ConnectorName = connectorName, - }; - var simulators = new List { simint }; - - var integrations = await cdf.Sequences.GetOrCreateSimulatorIntegrations( - simulators, - CancellationToken.None).ConfigureAwait(false); - - var sequenceExternalId = integrations.First().ExternalId; - - await cdf.Sequences.UpdateSimulatorIntegrationsData( - sequenceExternalId, - true, - new SimulatorIntegrationUpdate - { - Simulator = simint.Simulator, - DataSetId = simint.DataSetId, - ConnectorName = simint.ConnectorName, - SimulatorApiEnabled = true, - }, - CancellationToken.None, - lastLicenseCheckTimestamp: 0, - lastLicenseCheckResult: "Available").ConfigureAwait(false); - } + private static Dictionary ToRowDictionary(SequenceData data) { Dictionary result = new(); diff --git a/Cognite.Simulator.Tests/UtilsTests/SimulationSchedulerTest.cs b/Cognite.Simulator.Tests/UtilsTests/SimulationSchedulerTest.cs index 2617f3d3..1fc7b6e2 100644 --- a/Cognite.Simulator.Tests/UtilsTests/SimulationSchedulerTest.cs +++ b/Cognite.Simulator.Tests/UtilsTests/SimulationSchedulerTest.cs @@ -22,118 +22,6 @@ public class SimulationSchedulerTest { [Fact] public async Task TestSimulationSchedulerBase() - { - var services = new ServiceCollection(); - services.AddCogniteTestClient(); - services.AddHttpClient(); - services.AddSingleton(); - services.AddSingleton(new ConnectorConfig - { - NamePrefix = "scheduler-test-connector", - AddMachineNameSuffix = false, - SchedulerUpdateInterval = 2 - }); - services.AddSingleton(); - - StateStoreConfig stateConfig = null; - - List eventIds = new List(); - - using var source = new CancellationTokenSource(); - using var provider = services.BuildServiceProvider(); - var cdf = provider.GetRequiredService(); - try - { - var simint = new SimulatorIntegration () { - Simulator = "PROSPER", - DataSetId = CdfTestClient.TestDataset, - ConnectorName = "scheduler-test-connector", - }; - var simulators = new List { simint }; - - var integrations = await cdf.Sequences.GetOrCreateSimulatorIntegrations( - simulators, - CancellationToken.None).ConfigureAwait(false); - - var sequenceExternalId = integrations.First().ExternalId; - - await cdf.Sequences.UpdateSimulatorIntegrationsData( - sequenceExternalId, - true, - new SimulatorIntegrationUpdate - { - Simulator = simint.Simulator, - DataSetId = simint.DataSetId, - ConnectorName = simint.ConnectorName, - SimulatorApiEnabled = true, - }, - CancellationToken.None, - lastLicenseCheckTimestamp: 0, - lastLicenseCheckResult: "Available").ConfigureAwait(false); - - - stateConfig = provider.GetRequiredService(); - var configLib = provider.GetRequiredService(); - var scheduler = provider.GetRequiredService(); - - await configLib.Init(source.Token).ConfigureAwait(false); - - using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(source.Token); - var linkedToken = linkedTokenSource.Token; - linkedTokenSource.CancelAfter(TimeSpan.FromSeconds(10)); - var taskList = new List { scheduler.Run(linkedToken) }; - taskList.AddRange(configLib.GetRunTasks(linkedToken)); - await taskList.RunAll(linkedTokenSource).ConfigureAwait(false); - - Assert.NotEmpty(configLib.State); - var configState = Assert.Contains( - "PROSPER-SC-UserDefined-SST-Connector_Test_Model", // This simulator configuration should exist in CDF - (IReadOnlyDictionary)configLib.State); - var configObj = configLib.GetSimulationConfiguration( - "PROSPER", "Connector Test Model", "UserDefined", "SST"); - Assert.NotNull(configObj); - - // Should have created at least one simulation event ready to run - var events = await cdf.Alpha.Simulators.ListSimulationRunsAsync( - new SimulationRunQuery { - Filter = new SimulationRunFilter { - ModelName = configObj.ModelName, - RoutineName = configObj.CalculationName, - SimulatorName = configObj.Simulator, - Status = SimulationRunStatus.ready - } - }, source.Token).ConfigureAwait(false); - - Assert.NotEmpty(events.Items); - // order events.Items by created time in descending order and filter only items created in the last 10 seconds - var latestEvents = events.Items.OrderByDescending(e => e.CreatedTime).ToList(); - var latestEventsFiltered = latestEvents.Where(e => e.CreatedTime > DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds()).ToList(); - Assert.Contains(latestEventsFiltered, e => e.RunType == SimulationRunType.scheduled); - } - finally - { - if (eventIds.Any()) - { - await cdf.Events.DeleteAsync(eventIds, source.Token) - .ConfigureAwait(false); - - // TODO Delete simulation runs created on the API - } - provider.Dispose(); // Dispose provider to also dispose managed services - if (Directory.Exists("./configurations")) - { - Directory.Delete("./configurations", true); - } - if (stateConfig != null) - { - StateUtils.DeleteLocalFile(stateConfig.Location); - } - } - } - - [Fact] - [Trait("Category", "API")] - public async Task TestSimulationSchedulerBaseWithApi() { var services = new ServiceCollection(); services.AddCogniteTestClient(); @@ -156,34 +44,10 @@ public async Task TestSimulationSchedulerBaseWithApi() using var source = new CancellationTokenSource(); using var provider = services.BuildServiceProvider(); var cdf = provider.GetRequiredService(); - var simint = new SimulatorIntegration () { - Simulator = "PROSPER", - DataSetId = CdfTestClient.TestDataset, - ConnectorName = "scheduler-test-connector", - }; - var simulators = new List { simint }; + try { - var integrations = await cdf.Sequences.GetOrCreateSimulatorIntegrations( - simulators, - CancellationToken.None).ConfigureAwait(false); - - var sequenceExternalId = integrations.First().ExternalId; - - // Update the sequence with connector heartbeat - await cdf.Sequences.UpdateSimulatorIntegrationsData( - sequenceExternalId, - true, - new SimulatorIntegrationUpdate - { - Simulator = simint.Simulator, - DataSetId = simint.DataSetId, - ConnectorName = simint.ConnectorName, - SimulatorApiEnabled = true, - }, - CancellationToken.None, - lastLicenseCheckTimestamp: testStartTimeMillis, - lastLicenseCheckResult: "Available").ConfigureAwait(false); + await TestHelpers.SimulateProsperRunningAsync(cdf, "scheduler-test-connector").ConfigureAwait(false); stateConfig = provider.GetRequiredService(); var configLib = provider.GetRequiredService(); @@ -221,7 +85,10 @@ await cdf.Sequences.UpdateSimulatorIntegrationsData( Assert.NotEmpty(simRuns.Items); // check if there are any simulation runs in the time span of the test - Assert.Contains(simRuns.Items, r => r.CreatedTime > testStartTimeMillis && r.CreatedTime < DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + // with the run type set to scheduled + var latestEventsFiltered = simRuns.Items.Where(r => r.CreatedTime >= testStartTimeMillis && r.CreatedTime < DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + Assert.NotEmpty(latestEventsFiltered); + Assert.Contains(latestEventsFiltered, e => e.RunType == SimulationRunType.scheduled); } finally { diff --git a/Cognite.Simulator.Utils/Cognite.Simulator.Utils.csproj b/Cognite.Simulator.Utils/Cognite.Simulator.Utils.csproj index 8af28ba8..1fe4149b 100644 --- a/Cognite.Simulator.Utils/Cognite.Simulator.Utils.csproj +++ b/Cognite.Simulator.Utils/Cognite.Simulator.Utils.csproj @@ -21,7 +21,8 @@ - + + diff --git a/Cognite.Simulator.Utils/ConnectorBase.cs b/Cognite.Simulator.Utils/ConnectorBase.cs index 5351e70a..5ea19b13 100644 --- a/Cognite.Simulator.Utils/ConnectorBase.cs +++ b/Cognite.Simulator.Utils/ConnectorBase.cs @@ -9,6 +9,8 @@ using System.Threading; using System.Threading.Tasks; using Cognite.Extractor.Configuration; +using CogniteSdk.Alpha; +using CogniteSdk; namespace Cognite.Simulator.Utils { @@ -30,7 +32,7 @@ public abstract class ConnectorBase where T : BaseConfig protected IList Simulators { get; } private ConnectorConfig Config { get; } - private readonly Dictionary _simulatorSequenceIds; + private readonly Dictionary _simulatorIntegrationIds; private readonly ILogger> _logger; private readonly ConnectorConfig _config; @@ -58,31 +60,30 @@ public ConnectorBase( Cdf = cdf; Simulators = simulators; Config = config; - _simulatorSequenceIds = new Dictionary(); + _simulatorIntegrationIds = new Dictionary(); _logger = logger; _config = config; _remoteConfigManager = remoteConfigManager; } /// - /// Returns the external ID of the sequence in CDF that contains information - /// about the simulator integration, if any + /// Returns the ID of the simulator integration resource in CDF, if any /// /// Simulator name - /// External ID, or null if not found - public string GetSimulatorIntegartionExternalId(string simulator) + /// Simulator integration ID, or null if not found + public long? GetSimulatorIntegrationId(string simulator) { - if (!_simulatorSequenceIds.ContainsKey(simulator)) + if (!_simulatorIntegrationIds.ContainsKey(simulator)) { return null; } - return _simulatorSequenceIds[simulator]; + return _simulatorIntegrationIds[simulator]; } /// /// Initialize the connector. Should include any initialization tasks to be performed before the connector loop. /// This should include a call to - /// + /// /// /// Cancellation token public abstract Task Init(CancellationToken token); @@ -165,35 +166,59 @@ public virtual bool ApiEnabled() } /// - /// For each simulator specified in the configuration, create a sequence in CDF containing the - /// simulator name and connector name as meta-data. The sequence will have key-value pairs as - /// rows. The keys are: heartbeat, data set id and connector version. The rows will be updated + /// For each simulator specified in the configuration, create a simulator integration in CDF containing the + /// simulator name, connector name, data set id, connector version, etc. These parameters will be updated /// periodically by the connector, and indicate the status of the currently running connector to /// applications consuming this simulation integration data. /// - protected async Task EnsureSimulatorIntegrationsSequencesExists(CancellationToken token) + protected async Task EnsureSimulatorIntegrationsExists(CancellationToken token) { - var sequences = Cdf.CogniteClient.Sequences; - var simulatorsDict = Simulators.Select( - s => new SimulatorIntegration - { - Simulator = s.Name, - DataSetId = s.DataSetId, - ConnectorName = GetConnectorName() - }); + var simulatorsApi = Cdf.CogniteClient.Alpha.Simulators; try { - var integrations = await sequences.GetOrCreateSimulatorIntegrations( - simulatorsDict, + var integrationRes = await simulatorsApi.ListSimulatorIntegrationsAsync( + new SimulatorIntegrationQuery(), token).ConfigureAwait(false); - foreach (var integration in integrations) + var integrations = integrationRes.Items; + var connectorName = GetConnectorName(); + foreach (var simulator in Simulators) { - _simulatorSequenceIds.Add( - integration.Metadata[BaseMetadata.SimulatorKey], - integration.ExternalId); + var simulatorName = simulator.Name; + var existing = integrations.FirstOrDefault(i => i.ExternalId == connectorName && i.SimulatorExternalId == simulator.Name); + if (existing == null) + { + _logger.LogInformation("Creating new simulator integration for {Simulator}", simulatorName); + var existingSimulators = await Cdf.CogniteClient.Alpha.Simulators.ListAsync( + new SimulatorQuery (), + token).ConfigureAwait(false); + var existingSimulator = existingSimulators.Items.FirstOrDefault(s => s.ExternalId == simulatorName); + if (existingSimulator == null) + { + _logger.LogWarning("Simulator {Simulator} not found in CDF", simulatorName); + throw new ConnectorException($"Simulator {simulatorName} not found in CDF"); + } + var integrationToCreate = new SimulatorIntegrationCreate + { + ExternalId = connectorName, + SimulatorExternalId = simulatorName, + DataSetId = simulator.DataSetId, + ConnectorVersion = GetConnectorVersion(), + SimulatorVersion = GetSimulatorVersion(simulatorName), + RunApiEnabled = ApiEnabled(), + }; + var res = await simulatorsApi.CreateSimulatorIntegrationAsync(new List { + integrationToCreate + }, token).ConfigureAwait(false); + _simulatorIntegrationIds[simulatorName] = res.First().Id; + } + else + { + _logger.LogInformation("Found existing simulator integration for {Simulator}", simulatorName); + _simulatorIntegrationIds[simulatorName] = existing.Id; + } } } - catch (SimulatorIntegrationSequenceException e) + catch (CogniteException e) { throw new ConnectorException(e.Message, e.CogniteErrors); } @@ -212,33 +237,43 @@ protected async Task UpdateIntegrationRows( { LastLicenseCheckResult = ShouldLicenseCheck() ? "Not checked yet" : "License check disabled"; } - var sequences = Cdf.CogniteClient.Sequences; + var simulatorsApi = Cdf.CogniteClient.Alpha.Simulators; try { foreach (var simulator in Simulators) { - var update = init ? - new SimulatorIntegrationUpdate + var integrationUpdate = init ? new SimulatorIntegrationUpdate + { + DataSetId = new Update { Set = simulator.DataSetId }, + ConnectorVersion = new Update { Set = GetConnectorVersion() }, + SimulatorVersion = new Update { Set = GetSimulatorVersion(simulator.Name) }, + RunApiEnabled = new Update { Set = ApiEnabled() }, + ConnectorStatus = new Update { Set = "IDLE" }, + ConnectorStatusUpdatedTime = new Update { Set = DateTime.UtcNow.ToUnixTimeMilliseconds() }, + Heartbeat = new Update { Set = DateTime.UtcNow.ToUnixTimeMilliseconds() }, + LicenseLastCheckedTime = new Update { Set = LastLicenseCheckTimestamp }, + LicenseStatus = new Update { Set = LastLicenseCheckResult }, + } : new SimulatorIntegrationUpdate { + Heartbeat = new Update { Set = DateTime.UtcNow.ToUnixTimeMilliseconds() }, + LicenseLastCheckedTime = new Update { Set = LastLicenseCheckTimestamp }, + LicenseStatus = new Update { Set = LastLicenseCheckResult }, + }; + var simIntegrationId = GetSimulatorIntegrationId(simulator.Name); + if (simIntegrationId == null) { - Simulator = simulator.Name, - DataSetId = simulator.DataSetId, - ConnectorName = GetConnectorName(), - ConnectorVersion = GetConnectorVersion(), - SimulatorVersion = GetSimulatorVersion(simulator.Name), - ExtraInformation = GetExtraInformation(simulator.Name), - SimulatorApiEnabled = ApiEnabled(), + _logger.LogWarning("Simulator integration for {Simulator} not found", simulator.Name); + throw new ConnectorException($"Simulator integration for {simulator.Name} not found"); } - : null; - await sequences.UpdateSimulatorIntegrationsData( - _simulatorSequenceIds[simulator.Name], - init, - update, - token, - lastLicenseCheckTimestamp: LastLicenseCheckTimestamp, - lastLicenseCheckResult: LastLicenseCheckResult).ConfigureAwait(false); + var integrationUpdateItem = new UpdateItem(GetSimulatorIntegrationId(simulator.Name).Value) + { + Update = integrationUpdate, + }; + await simulatorsApi.UpdateSimulatorIntegrationAsync( + new [] { integrationUpdateItem }, + token).ConfigureAwait(false); } } - catch (SimulatorIntegrationSequenceException e) + catch (CogniteException e) { throw new ConnectorException(e.Message, e.CogniteErrors); } diff --git a/Cognite.Simulator.Utils/SimulationRunnerBase.cs b/Cognite.Simulator.Utils/SimulationRunnerBase.cs index f8bbb903..2fa20220 100644 --- a/Cognite.Simulator.Utils/SimulationRunnerBase.cs +++ b/Cognite.Simulator.Utils/SimulationRunnerBase.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Data; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -46,7 +47,7 @@ public abstract class SimulationRunnerBase /// protected IConfigurationProvider ConfigurationLibrary { get; } - private string sequenceExternalId; + private long? simulatorIntegrationId; /// @@ -79,13 +80,12 @@ public SimulationRunnerBase( _logger = logger; ModelLibrary = modelLibrary; ConfigurationLibrary = configLibrary; - sequenceExternalId = ""; } private async Task UpdateSimulationRunStatus( - long runId, - SimulationRunStatus status, - string statusMessage, + long runId, + SimulationRunStatus status, + string statusMessage, CancellationToken token) { var res = await _cdfSimulators.SimulationRunCallbackAsync( @@ -100,8 +100,8 @@ private async Task UpdateSimulationRunStatus( } private async Task> FindSimulationRunsWithStatus( - Dictionary simulators, - SimulationRunStatus status, + Dictionary simulators, + SimulationRunStatus status, CancellationToken token) { var result = new List(); @@ -136,9 +136,9 @@ private async Task> FindSimulationEvents( CancellationToken token) { var simulationRuns = await FindSimulationRunsWithStatus( - simulatorDataSetMap, + simulatorDataSetMap, status, token).ConfigureAwait(false); - return simulationRuns.Select(r => new SimulationRunEvent(r)).ToList(); + return simulationRuns.Select(r => new SimulationRunEvent(r)).ToList(); } /// @@ -182,7 +182,8 @@ public async Task Run(CancellationToken token) .ToList(); // sort by event time - allEvents.Sort((e1, e2) => { + allEvents.Sort((e1, e2) => + { return e1.Run.CreatedTime > e2.Run.CreatedTime ? -1 : 1; }); foreach (SimulationRunEvent e in allEvents) @@ -199,7 +200,7 @@ public async Task Run(CancellationToken token) if (calcState == null || calcObj == null || calcObj.Connector != _connectorConfig.GetConnectorName()) { _logger.LogError("Skip simulation run that belongs to another connector: {Id} {Connector}", - eventId, + eventId, calcObj?.Connector); continue; } @@ -252,16 +253,16 @@ await InitSimulationRun( finally { await StoreRunConfiguration( - calcState, - calcObj, - startTime, - e, + calcState, + calcObj, + startTime, + e, token).ConfigureAwait(false); - + PublishSimulationRunStatus("IDLE", token); } - + } await Task.Delay(interval, token).ConfigureAwait(false); @@ -327,10 +328,13 @@ await StoreRunConfiguration( } U calcState; V calcConfig; - if (simEv.HasSimulationRun) { + if (simEv.HasSimulationRun) + { calcState = ConfigurationLibrary.GetSimulationConfigurationState(simulator, modelName, calcTypeUserDefined); calcConfig = ConfigurationLibrary.GetSimulationConfiguration(simulator, modelName, calcTypeUserDefined); - } else { + } + else + { calcState = ConfigurationLibrary.GetSimulationConfigurationState(simulator, modelName, calcType, calcTypeUserDefined); calcConfig = ConfigurationLibrary.GetSimulationConfiguration(simulator, modelName, calcType, calcTypeUserDefined); } @@ -369,19 +373,37 @@ protected abstract void InitSimulationEventMetadata( V configObj, Dictionary metadata); - async void PublishSimulationRunStatus(string runStatus, CancellationToken token) { - var sequences = _cdfSequences; - + async void PublishSimulationRunStatus(string runStatus, CancellationToken token) + { try { - if (sequenceExternalId == "" && _simulators.Count > 0) { - SimulatorConfig item = _simulators[0]; // Retrieve the first item - sequenceExternalId = await SequencesExtensions.GetSequenceExternalId(sequences, item.Name, item.DataSetId, _connectorConfig.GetConnectorName(), token).ConfigureAwait(false); + if (!simulatorIntegrationId.HasValue && _simulators.Count > 0) + { + SimulatorConfig simulator = _simulators[0]; // Retrieve the first item + var integrationRes = await _cdfSimulators.ListSimulatorIntegrationsAsync( + new SimulatorIntegrationQuery(), + token).ConfigureAwait(false); + var integration = integrationRes.Items.FirstOrDefault(i => i.SimulatorExternalId == simulator.Name && i.ExternalId == _connectorConfig.GetConnectorName()); + if (integration == null) + { + throw new ConnectorException($"Simulator integration for {simulator.Name} not found"); + } + simulatorIntegrationId = integration.Id; } - var now = $"{DateTime.UtcNow.ToUnixTimeMilliseconds()}"; - await SequencesExtensions.UpsertItemInKVPSequence(_cdfSequences, sequenceExternalId, SimulatorIntegrationSequenceRows.ConnectorStatus, runStatus, token).ConfigureAwait(false); - await SequencesExtensions.UpsertItemInKVPSequence(_cdfSequences, sequenceExternalId, SimulatorIntegrationSequenceRows.ConnectorStatusTimestamp, now, token).ConfigureAwait(false); - + var now = DateTime.UtcNow.ToUnixTimeMilliseconds(); + var simulatorIntegrationUpdate = new SimulatorIntegrationUpdate + { + ConnectorStatus = new Update(runStatus), + ConnectorStatusUpdatedTime = new Update(now) + }; + await _cdfSimulators.UpdateSimulatorIntegrationAsync( + new [] { + new SimulatorIntegrationUpdateItem(simulatorIntegrationId.Value) { + Update = simulatorIntegrationUpdate + } + }, + token + ).ConfigureAwait(false); } catch (Exception e) @@ -425,9 +447,9 @@ protected virtual async Task InitSimulationRun( if (simEv.HasSimulationRun) { simEv.Run = await UpdateSimulationRunStatus( - simEv.Run.Id, - SimulationRunStatus.running, - null, + simEv.Run.Id, + SimulationRunStatus.running, + null, token).ConfigureAwait(false); } else @@ -449,7 +471,7 @@ protected virtual async Task InitSimulationRun( throw new SimulationException($"Data sampling configuration for {configObj.CalculationName} missing"); } // Determine the validation end time - if (!simEv.HasSimulationRun + if (!simEv.HasSimulationRun && simEv.Event.Metadata.TryGetValue(SimulationEventMetadata.ValidationEndOverwriteKey, out string validationEndOverwrite) && long.TryParse(validationEndOverwrite, out long overwriteValue)) { @@ -457,7 +479,8 @@ protected virtual async Task InitSimulationRun( // the current time validationEnd = CogniteTime.FromUnixTimeMilliseconds(overwriteValue); } - else if (simEv.Run.ValidationEndTime.HasValue) { + else if (simEv.Run.ValidationEndTime.HasValue) + { // If the event contains a validation end overwrite, use that instead of // the current time validationEnd = CogniteTime.FromUnixTimeMilliseconds(simEv.Run.ValidationEndTime.Value); @@ -662,7 +685,7 @@ private async Task StoreRunConfiguration( _logger.LogDebug("Simulation run has no Event associated with it {Id}", simEv.Run.Id); return; } - simEvent = await _cdfEvents.GetAsync(simEv.Run.EventId.Value, token).ConfigureAwait(false);; + simEvent = await _cdfEvents.GetAsync(simEv.Run.EventId.Value, token).ConfigureAwait(false); } else { diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 00000000..fee92389 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,9 @@ +coverage: + precision: 2 + round: down + range: 70...100 + +status: + project: + default: + threshold: 2% \ No newline at end of file diff --git a/version b/version index e3010c23..e8bb775e 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.0.0-alpha-016 +1.0.0-alpha-017