Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote config from extraction pipelines [DEGR-2723] #63

Merged
merged 30 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions Cognite.Simulator.Tests/UtilsTests/ConnectorBaseTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Cognite.Extractor.Common;
using Cognite.Extractor.Logging;
using Cognite.Extractor.Utils;
using Cognite.Simulator.Extensions;
using Cognite.Simulator.Utils;
Expand All @@ -23,6 +24,9 @@
{
var services = new ServiceCollection();
services.AddCogniteTestClient();
services.AddLogger();
services.AddSingleton<RemoteConfigManager<BaseConfig>>(provider => null!);
services.AddSingleton<BaseConfig>();
services.AddTransient<TestConnector>();
services.AddSingleton<ExtractionPipeline>();
var simConfig = new SimulatorConfig
Expand Down Expand Up @@ -90,7 +94,7 @@
Assert.True(long.Parse(lastHeartbeat) > long.Parse(heartbeat));

var pipelines = await cdf.ExtPipes.RetrieveAsync(
new List<string> { cdfConfig.ExtractionPipeline.PipelineId },

Check warning on line 97 in Cognite.Simulator.Tests/UtilsTests/ConnectorBaseTest.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 97 in Cognite.Simulator.Tests/UtilsTests/ConnectorBaseTest.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'item' in 'void List<string>.Add(string item)'.
true,
source.Token).ConfigureAwait(false);
Assert.Contains(pipelines, p => p.ExternalId == cdfConfig.ExtractionPipeline.PipelineId);
Expand All @@ -105,25 +109,29 @@
.ConfigureAwait(false);
}
await cdf.ExtPipes
.DeleteAsync(new []{ cdfConfig.ExtractionPipeline.PipelineId }, CancellationToken.None).ConfigureAwait(false);

Check warning on line 112 in Cognite.Simulator.Tests/UtilsTests/ConnectorBaseTest.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
}
}
}


/// <summary>
/// Implements a simple mock connector that only reports
/// status back to CDF (Heartbeat)
/// </summary>
internal class TestConnector : ConnectorBase
internal class TestConnector : ConnectorBase<BaseConfig>
{
private readonly ExtractionPipeline _pipeline;
private readonly RemoteConfigManager<BaseConfig> _remoteConfigManager;
private readonly SimulatorConfig _config;



public TestConnector(
CogniteDestination cdf,
ExtractionPipeline pipeline,
SimulatorConfig config,
ILogger<ConnectorBase> logger) :
ILogger<TestConnector> logger,
RemoteConfigManager<BaseConfig> remoteConfigManager) :
base(
cdf,
new ConnectorConfig
Expand All @@ -135,10 +143,12 @@
{
config
},
logger)
logger,
remoteConfigManager)
{
_pipeline = pipeline;
_config = config;
_remoteConfigManager = remoteConfigManager;
}

public override string GetConnectorVersion()
Expand Down
40 changes: 37 additions & 3 deletions Cognite.Simulator.Utils/ConnectorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cognite.Extractor.Configuration;

namespace Cognite.Simulator.Utils
{
Expand All @@ -16,7 +17,7 @@ namespace Cognite.Simulator.Utils
/// The connector information is saved as a CDF sequence, where the rows
/// are key/value pairs (see <seealso cref="SimulatorIntegrationSequenceRows"/>)
/// </summary>
public abstract class ConnectorBase
public abstract class ConnectorBase<T> where T : BaseConfig
{
/// <summary>
/// CDF client wrapper
Expand All @@ -30,11 +31,14 @@ public abstract class ConnectorBase
private ConnectorConfig Config { get; }

private readonly Dictionary<string, string> _simulatorSequenceIds;
private readonly ILogger<ConnectorBase> _logger;
private readonly ILogger<ConnectorBase<T>> _logger;
private readonly ConnectorConfig _config;

private long LastLicenseCheckTimestamp { get; set; }
private string LastLicenseCheckResult { get; set; }
private const int FIFTEEN_MIN = 9000;

private readonly RemoteConfigManager<T> _remoteConfigManager;

/// <summary>
/// Initialize the connector with the given parameters
Expand All @@ -43,18 +47,21 @@ public abstract class ConnectorBase
/// <param name="config">Connector configuration</param>
/// <param name="simulators">List of simulator configurations</param>
/// <param name="logger">Logger</param>
/// <param name="remoteConfigManager"></param>
public ConnectorBase(
CogniteDestination cdf,
ConnectorConfig config,
IList<SimulatorConfig> simulators,
ILogger<ConnectorBase> logger)
ILogger<ConnectorBase<T>> logger,
RemoteConfigManager<T> remoteConfigManager)
{
Cdf = cdf;
Simulators = simulators;
Config = config;
_simulatorSequenceIds = new Dictionary<string, string>();
_logger = logger;
_config = config;
_remoteConfigManager = remoteConfigManager;
}

/// <summary>
Expand Down Expand Up @@ -284,6 +291,33 @@ await UpdateIntegrationRows(false, token)
.ConfigureAwait(false);
}
}

/// <summary>
/// Task that runs in a loop, checking for new config in extraction pipelines
/// </summary>
public async Task CheckRemoteConfig(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
await Task
.Delay(FIFTEEN_MIN, token) // Run every 15 minutes
.ConfigureAwait(false);
_logger.LogDebug("Checking remote config updates");
if (_remoteConfigManager == null) return;
var newConfig = await _remoteConfigManager.FetchLatest(token).ConfigureAwait(false);
if (newConfig != null)
{
throw new NewConfigDetected();
}
}
}
}

/// <summary>
/// Exception used to restart connector
/// </summary>
public class NewConfigDetected : Exception
{
}

/// <summary>
Expand Down
51 changes: 51 additions & 0 deletions Cognite.Simulator.Utils/ExtractionPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cognite.Extractor.Configuration;
using System.IO;

namespace Cognite.Simulator.Utils
{
Expand Down Expand Up @@ -237,5 +239,54 @@ public static void AddExtractionPipeline(this IServiceCollection services, Conne
services.AddSingleton(config.PipelineNotification);
services.AddScoped<ExtractionPipeline>();
}
/// <summary>
/// Use `type: remote` to fetch the config from Fusion, or use `type: local` to use the local file instead
/// Example from config.yml using the remote config from Fusion
/// type: remote # this is required
/// cognite:
/// project: ...
/// host: ...
/// extraction-pipeline:
/// pipeline-id: ... # as well as this
/// idp-authentication:
/// ...
/// </summary>
/// <typeparam name="T">The complete config object to be parsed</typeparam>
/// <param name="services">Service collection</param>
/// <param name="path">Path to config file</param>
/// <param name="types">Types to use for deserialization</param>
/// <param name="appId">App ID for measuring network data</param>
/// <param name="token">Cancellation token</param>
/// <param name="version">Config version</param>
/// <param name="acceptedConfigVersions">Accepted config versions</param>
public static async Task<T> AddConfiguration<T>(
this IServiceCollection services,
string path,
Type[] types,
string appId,
CancellationToken token,
int version = 1,
int[] acceptedConfigVersions = null) where T : BaseConfig
{
var localConfig = services.AddConfig<T>(path, version);
var remoteConfig = new RemoteConfig
{
Type = localConfig.Type,
Cognite = localConfig.Cognite
};

return await services.AddRemoteConfig<T>(
logger: null,
kbrattli marked this conversation as resolved.
Show resolved Hide resolved
path: path,
types: types,
appId: appId,
userAgent: null,
setDestination: true,
bufferConfigFile: false,
remoteConfig: remoteConfig,
token: token,
acceptedConfigVersions: acceptedConfigVersions
).ConfigureAwait(false);
}
}
}
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.0-alpha-013
1.0.0-alpha-014
Loading