Skip to content

Commit

Permalink
Simulator integration heartbeat API based (#82)
Browse files Browse the repository at this point in the history
* Start using newer SDK types

* More changes to accommodate the new API

* Fix ConnectorBase test

* Fix another test

* Fix more tests

* Update the version

* Remove unused tests

* Remove unused code

* Increase test coverage

* Cleanup

* Update version

* Update Cognite.Simulator.Extensions.csproj

* Update Cognite.Simulator.Extensions.csproj

* update codecov

---------

Co-authored-by: abdullah-cognite <[email protected]>
Co-authored-by: abdullah <[email protected]>
  • Loading branch information
3 people authored Dec 15, 2023
1 parent c28f4dc commit 5c06fe1
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 912 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
<None Include="..\LICENSE" Pack="true" Visible="false" PackagePath="" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Cognite.Extensions" Version="1.16.0" />
<PackageReference Include="Cognite.Extensions" Version="1.17.1" />
<!-- <ProjectReference Include="..\..\dotnet-extractor-utils\Cognite.Extensions\Cognite.Extensions.csproj" /> -->
</ItemGroup>

</Project>
57 changes: 0 additions & 57 deletions Cognite.Simulator.Extensions/DataModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ public static class SimulatorIntegrationMetadata
/// Name of the connector handling the integration with a simulator
/// </summary>
public const string ConnectorNameKey = "connector";

/// <summary>
/// Data type of simulation integrations
/// </summary>
public const SimulatorDataType DataType = SimulatorDataType.SimulatorIntegration;
}

/// <summary>
Expand Down Expand Up @@ -316,58 +311,6 @@ public static class KeyValuePairSequenceColumns
public const string ValueName = "Value";
}

/// <summary>
/// Row keys that should be present in simulator integration sequences
/// </summary>
public static class SimulatorIntegrationSequenceRows
{
/// <summary>
/// Heartbeat key (Last time seen)
/// </summary>
public const string Heartbeat = "heartbeat";

/// <summary>
/// Data set id key. Data set containg the data used and generated by the simulator integration
/// </summary>
public const string DataSetId = "dataSetId";

/// <summary>
/// Connector version key. Version of the connector associated with this simulator integration
/// </summary>
public const string ConnectorVersion = "connectorVersion";

/// <summary>
/// Api enabled key. Indicates if the simulator api is enabled for simulation runs or if it's using CDF Events
/// </summary>
public const string SimulatorsApiEnabled = "apiEnabled";

/// <summary>
/// Simulator version key. Installed version of the simulator
/// </summary>
public const string SimulatorVersion = "simulatorVersion";

/// <summary>
/// License key (Last time seen)
/// </summary>
public const string LicenseTimestamp = "licenseLastCheckedTime";

/// <summary>
/// License key (Last result)
/// </summary>
public const string LicenseStatus = "licenseStatus";

/// <summary>
/// Status of the connector (indicates whether a calculation is running or not)
/// </summary>
public const string ConnectorStatus = "connectorStatus";

/// <summary>
/// Timestamp of when the connector went into the new connectorStatus state
/// </summary>
public const string ConnectorStatusTimestamp = "connectorStatusUpdatedTime";

}

/// <summary>
/// Types of simulator resources that can be stored in CDF
/// </summary>
Expand Down
273 changes: 0 additions & 273 deletions Cognite.Simulator.Extensions/SequencesExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,157 +64,6 @@ public static async Task<SequenceData> FindModelBoundaryConditions(
}, token).ConfigureAwait(false);
}

/// <summary>
/// For each simulator in <paramref name="integrations"/>, retrieve or create
/// a simulator integration sequence in CDF
/// </summary>
/// <param name="sequences">CDF sequences resource</param>
/// <param name="integrations">List of simulation integration details</param>
/// <param name="token">Cancellation token</param>
/// <returns>Retrieved or created sequences</returns>
/// <exception cref="SimulatorIntegrationSequenceException">Thrown when one or more sequences
/// could not be created. The exception contains the list of errors</exception>
public static async Task<IEnumerable<Sequence>> GetOrCreateSimulatorIntegrations(
this SequencesResource sequences,
IEnumerable<SimulatorIntegration> integrations,
CancellationToken token)
{
var result = new List<Sequence>();
if (integrations == null || !integrations.Any())
{
return result;
}

var toCreate = new Dictionary<string, SequenceCreate>();
foreach (var integration in integrations)
{
var metadata = new Dictionary<string, string>()
{
{ BaseMetadata.DataTypeKey, SimulatorIntegrationMetadata.DataType.MetadataValue() },
{ BaseMetadata.SimulatorKey, integration.Simulator },
{ SimulatorIntegrationMetadata.ConnectorNameKey, integration.ConnectorName }
};
var query = new SequenceQuery
{
Filter = new SequenceFilter
{
Metadata = metadata
}
};
var seqs = await sequences.ListAsync(query, token).ConfigureAwait(false);
if (seqs.Items.Any())
{
result.Add(seqs.Items.First());
}
else
{
metadata.Add(BaseMetadata.DataModelVersionKey, BaseMetadata.DataModelVersionValue);
var createObj = new SequenceCreate
{
Name = $"{integration.Simulator} Simulator Integration",
ExternalId = $"{integration.Simulator}-INTEGRATION-{DateTime.UtcNow.ToUnixTimeMilliseconds()}",
Description = $"Details about {integration.Simulator} integration",
Metadata = metadata,
Columns = GetKeyValueColumnWrite()
};
if (integration.DataSetId.HasValue)
{
createObj.DataSetId = integration.DataSetId.Value;
}
toCreate.Add(createObj.ExternalId, createObj);
}
}
if (toCreate.Any())
{
var createdSequences = await sequences.GetOrCreateAsync(
toCreate.Keys,
(ids) => toCreate
.Where(o => ids.Contains(o.Key))
.Select(o => o.Value).ToList(),
chunkSize: 10,
throttleSize: 1,
RetryMode.None,
SanitationMode.None,
token).ConfigureAwait(false);

if (!createdSequences.IsAllGood)
{
throw new SimulatorIntegrationSequenceException("Could not find or create simulator integration sequence", createdSequences.Errors);
}
if (createdSequences.Results != null)
{
result.AddRange(createdSequences.Results);
}
}
return result;
}

/// <summary>
/// Update the simulator integration sequence with the connector heartbeat (last time seen)
/// </summary>
/// <param name="sequences">CDF Sequences resource</param>
/// <param name="init">If true, the data set id, connector version and extra information rows are also
/// updated. Else, only the heartbeat row is updated</param>
/// <param name="sequenceExternalId">Simulator integration sequence external id</param>
/// <param name="update">Data to be updated, if init is set to true</param>
/// <param name="token">Cancellation token</param>
/// <param name="lastLicenseCheckTimestamp">Timestamp of the last license check</param>
/// <param name="lastLicenseCheckResult">Result of the last license check</param>
/// <exception cref="SimulatorIntegrationSequenceException">Thrown when one or more sequences
/// rows could not be updated. The exception contains the list of errors</exception>
public static async Task UpdateSimulatorIntegrationsData(
this SequencesResource sequences,
string sequenceExternalId,
bool init,
SimulatorIntegrationUpdate update,
CancellationToken token,
long lastLicenseCheckTimestamp,
string lastLicenseCheckResult)
{
var rowsToCreate = new List<SequenceDataCreate>();
var rowData = new Dictionary<string, string>();

rowData.Add(SimulatorIntegrationSequenceRows.Heartbeat, $"{DateTime.UtcNow.ToUnixTimeMilliseconds()}");
rowData.Add(SimulatorIntegrationSequenceRows.LicenseTimestamp, $"{lastLicenseCheckTimestamp}");
rowData.Add(SimulatorIntegrationSequenceRows.LicenseStatus, lastLicenseCheckResult);

if (init)
{
if (update == null)
{
throw new ArgumentNullException(nameof(update));
}
// Data set and version could only have changed on connector restart
if (update.DataSetId.HasValue)
{
rowData.Add(SimulatorIntegrationSequenceRows.DataSetId, $"{update.DataSetId.Value}");
}
rowData.Add(SimulatorIntegrationSequenceRows.ConnectorVersion, $"{update.ConnectorVersion}");
rowData.Add(SimulatorIntegrationSequenceRows.SimulatorVersion, $"{update.SimulatorVersion}");
rowData.Add(SimulatorIntegrationSequenceRows.SimulatorsApiEnabled, $"{update.SimulatorApiEnabled}");
if (update.ExtraInformation != null && update.ExtraInformation.Any())
{
update.ExtraInformation.ToList().ForEach(i => rowData.Add(i.Key, i.Value));
}
}

var rowCreate = ToSequenceData(rowData, sequenceExternalId, 0);
rowsToCreate.Add(rowCreate);
var result = await sequences.InsertAsync(
rowsToCreate,
keyChunkSize: 10,
valueChunkSize: 100,
sequencesChunk: 10,
throttleSize: 1,
RetryMode.None,
SanitationMode.None,
token).ConfigureAwait(false);
if (!result.IsAllGood)
{
throw new SimulatorIntegrationSequenceException("Could not update simulator integration sequence", result.Errors);
}
}

/// <summary>
/// Store tabular simulation results as sequences
/// </summary>
Expand Down Expand Up @@ -543,128 +392,6 @@ private static SequenceCreate GetSequenceCreatePrototype(
}
return seqCreate;
}

/// <summary>
/// Read the values of a <see cref="SequenceRow"/> and returns
/// it as a string array
/// </summary>
/// <param name="row">CDF Sequence row</param>
/// <returns>Array containing the row values</returns>
public static string[] GetStringValues(this SequenceRow row)
{
var result = new List<string>();
foreach (var val in row.Values)
{
if (val != null && val.Type == MultiValueType.STRING)
{
result.Add(((MultiValue.String)val).Value);
}
else
{
result.Add(null);
}
}
return result.ToArray();
}

/// <summary>
/// Simply upserts a key value pair into a key value pair based sequence
/// </summary>
/// <param name="sequences">CDF Sequence resource</param>
/// <param name="sequenceExternalId">External id of the sequence.</param>
/// <param name="updateKey">Key in sequence to update</param>
/// <param name="updateValue">Value to update the Key value pair to</param>
/// <param name="token">The cancellation token</param>
/// <returns>Nothing</returns>
public static async
Task UpsertItemInKVPSequence(this SequencesResource sequences, string sequenceExternalId, string updateKey, string updateValue, CancellationToken token) {
if (sequenceExternalId == "") {
throw new Exception("External Id is required to upsert a sequence");
}
SequenceRowQuery query = new SequenceRowQuery();
query.ExternalId = sequenceExternalId;
var sequenceList = await sequences.ListRowsAsync(query, token).ConfigureAwait(false);;
var rows = sequenceList.Rows;
var columns = sequenceList.Columns;
var newRows = rows.Select(r => new SequenceRow
{
RowNumber = r.RowNumber,
Values = r.Values
}).ToList();
// Checking to see if the key alread exists
var foundIndex = newRows.FindIndex(0, newRows.Count, (item => item.Values.FirstOrDefault().ToString().Contains(updateKey) ));
if (foundIndex == -1){
newRows.Insert(newRows.Count , new SequenceRow{
RowNumber = newRows.Count,
Values = new List<MultiValue> {
MultiValue.Create(updateKey),
MultiValue.Create(updateValue)
}
});
} else {
newRows[foundIndex].Values = new List<MultiValue> {
MultiValue.Create(updateKey),
MultiValue.Create(updateValue)
};
}

SequenceDataCreate sequenceData = new SequenceDataCreate
{
ExternalId = sequenceExternalId,
Columns = new List<string> {
KeyValuePairSequenceColumns.Key,
KeyValuePairSequenceColumns.Value
},
Rows = newRows
};
var rowsToCreate = new List<SequenceDataCreate>();
rowsToCreate.Add(sequenceData);
var result = await sequences.InsertAsync(
rowsToCreate,
keyChunkSize: 10,
valueChunkSize: 100,
sequencesChunk: 10,
throttleSize: 1,
RetryMode.None,
SanitationMode.None,
token
).ConfigureAwait(false);

if (!result.IsAllGood)
{
throw new SimulatorIntegrationSequenceException("Could not update simulator integration sequence", result.Errors);
}
}

/// <summary>
/// Gets the externalId of a SimulatorIntegrations sequence (This function can throw an exception)
/// </summary>
/// <param name="sequences">CDF Sequence resource</param>
/// <param name="simulatorName">The simulator name.</param>
/// <param name="simulatorDataSetId">The simulator datasetId</param>
/// <param name="connectorName">The connector Name</param>
/// <param name="token">The cancellation token</param>
/// <returns>externalId string</returns>
public static async Task<string> GetSequenceExternalId(this SequencesResource sequences, string simulatorName, long simulatorDataSetId, string connectorName, CancellationToken token) {
var simulatorSequenceIds = new Dictionary<string, string>();
var simulatorsDict = new SimulatorIntegration
{
Simulator = simulatorName,
DataSetId = simulatorDataSetId,
ConnectorName = connectorName
};

var integrations = await sequences.GetOrCreateSimulatorIntegrations(
new List<SimulatorIntegration> { simulatorsDict },
token).ConfigureAwait(false);
foreach (var integration in integrations)
{
simulatorSequenceIds.Add(
integration.Metadata[BaseMetadata.SimulatorKey],
integration.ExternalId);
}
return simulatorSequenceIds[simulatorName];
}
}

/// <summary>
Expand Down
Loading

0 comments on commit 5c06fe1

Please sign in to comment.