From 6d2baf0e579722731004c827c6fc711a7c7da543 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Mon, 16 Dec 2024 13:59:31 +0100 Subject: [PATCH] feat(Runner): Added a new stand-alone execution mode to the runner, enabling it to run workflows autonomously, without the Synapse stack Signed-off-by: Charles d'Avernas --- .../Synapse.Api.Application.csproj | 2 +- .../Services/IDocumentApiClient.cs | 2 +- .../Synapse.Api.Client.Core.csproj | 2 +- .../Services/DocumentHttpApiClient.cs | 2 +- .../Synapse.Api.Client.Http.csproj | 2 +- .../Synapse.Api.Http/Synapse.Api.Http.csproj | 2 +- .../Synapse.Api.Server.csproj | 2 +- src/cli/Synapse.Cli/Synapse.Cli.csproj | 2 +- ...re.Infrastructure.Containers.Docker.csproj | 2 +- ...nfrastructure.Containers.Kubernetes.csproj | 2 +- .../Synapse.Core.Infrastructure.csproj | 2 +- src/core/Synapse.Core/Synapse.Core.csproj | 2 +- src/core/Synapse.Core/SynapseDefaults.cs | 18 + .../Synapse.Correlator.csproj | 2 +- .../Synapse.Operator/Synapse.Operator.csproj | 2 +- .../Configuration/RunnerCloudEventOptions.cs | 9 +- .../Configuration/RunnerContainerOptions.cs | 2 +- .../Configuration/RunnerExecutionMode.cs | 35 + .../Configuration/RunnerLoggingOptions.cs | 30 + .../Configuration/RunnerOptions.cs | 10 + .../Configuration/WorkflowOptions.cs | 40 +- src/runner/Synapse.Runner/Program.cs | 110 +-- .../Properties/launchSettings.json | 3 +- src/runner/Synapse.Runner/RunnerDefaults.cs | 47 ++ ...s => ConnectedWorkflowExecutionContext.cs} | 43 +- .../Services/MemoryDocumentManager.cs | 67 ++ .../Services/RunnerApplication.cs | 85 +- .../StandAloneWorkflowExecutionContext.cs | 768 ++++++++++++++++++ .../Synapse.Runner/Synapse.Runner.csproj | 4 +- .../Synapse.Runner/WorkflowOutputFormat.cs | 33 + .../Synapse.Runtime.Abstractions.csproj | 2 +- .../Synapse.Runtime.Docker.csproj | 2 +- .../Synapse.Runtime.Kubernetes.csproj | 2 +- .../Synapse.Runtime.Native.csproj | 2 +- .../Cases/Conformance/ConformanceTestsBase.cs | 2 +- .../Services/MockDocumentApiClient.cs | 2 +- .../MockWorkflowExecutionContextFactory.cs | 2 +- 37 files changed, 1250 insertions(+), 96 deletions(-) create mode 100644 src/runner/Synapse.Runner/Configuration/RunnerExecutionMode.cs create mode 100644 src/runner/Synapse.Runner/Configuration/RunnerLoggingOptions.cs create mode 100644 src/runner/Synapse.Runner/RunnerDefaults.cs rename src/runner/Synapse.Runner/Services/{WorkflowExecutionContext.cs => ConnectedWorkflowExecutionContext.cs} (94%) create mode 100644 src/runner/Synapse.Runner/Services/MemoryDocumentManager.cs create mode 100644 src/runner/Synapse.Runner/Services/StandAloneWorkflowExecutionContext.cs create mode 100644 src/runner/Synapse.Runner/WorkflowOutputFormat.cs diff --git a/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj b/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj index 7339018f7..698aa3c87 100644 --- a/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj +++ b/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/api/Synapse.Api.Client.Core/Services/IDocumentApiClient.cs b/src/api/Synapse.Api.Client.Core/Services/IDocumentApiClient.cs index 5d5cd87a5..ed787525e 100644 --- a/src/api/Synapse.Api.Client.Core/Services/IDocumentApiClient.cs +++ b/src/api/Synapse.Api.Client.Core/Services/IDocumentApiClient.cs @@ -51,6 +51,6 @@ public interface IDocumentApiClient /// The id of the to delete /// A /// The with the specified id - Task DeletesAsync(string id, CancellationToken cancellationToken = default); + Task DeleteAsync(string id, CancellationToken cancellationToken = default); } diff --git a/src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj b/src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj index 36887c7af..038a3046f 100644 --- a/src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj +++ b/src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/api/Synapse.Api.Client.Http/Services/DocumentHttpApiClient.cs b/src/api/Synapse.Api.Client.Http/Services/DocumentHttpApiClient.cs index 6a9f14165..cdd9b8478 100644 --- a/src/api/Synapse.Api.Client.Http/Services/DocumentHttpApiClient.cs +++ b/src/api/Synapse.Api.Client.Http/Services/DocumentHttpApiClient.cs @@ -68,7 +68,7 @@ public virtual async Task UpdateAsync(string id, object content, CancellationTok } /// - public virtual async Task DeletesAsync(string id, CancellationToken cancellationToken = default) + public virtual async Task DeleteAsync(string id, CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(id); var uri = $"{PathPrefix}/{id}"; diff --git a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj index ba893c9f5..e1837226a 100644 --- a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj +++ b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj index 56f696639..132ec7a17 100644 --- a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj +++ b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj @@ -8,7 +8,7 @@ Library True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj index 93a53e64c..676057672 100644 --- a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj +++ b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/cli/Synapse.Cli/Synapse.Cli.csproj b/src/cli/Synapse.Cli/Synapse.Cli.csproj index 6d62e0b36..8ceee1914 100644 --- a/src/cli/Synapse.Cli/Synapse.Cli.csproj +++ b/src/cli/Synapse.Cli/Synapse.Cli.csproj @@ -8,7 +8,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj b/src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj index a7cea3b3f..b39a65485 100644 --- a/src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj +++ b/src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj b/src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj index 449ba06bd..4d5fa6739 100644 --- a/src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj +++ b/src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj index f4a3c6039..fedb03923 100644 --- a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj +++ b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj index b652a32b4..e1deacf55 100644 --- a/src/core/Synapse.Core/Synapse.Core.csproj +++ b/src/core/Synapse.Core/Synapse.Core.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/core/Synapse.Core/SynapseDefaults.cs b/src/core/Synapse.Core/SynapseDefaults.cs index 220af8557..54e4571c7 100644 --- a/src/core/Synapse.Core/SynapseDefaults.cs +++ b/src/core/Synapse.Core/SynapseDefaults.cs @@ -678,6 +678,24 @@ public static class Runner /// public const string Name = Prefix + "NAME"; + /// + /// Exposes constants about environment variables related to runner cloud events + /// + public static class CloudEvents + { + + /// + /// Gets the prefix for all environment variables related to runner cloud events + /// + public const string Prefix = Runner.Prefix + "CLOUD_EVENTS_"; + + /// + /// Gets the environment variable used to configure the runner's cloud event sink + /// + public const string Sink = Prefix + "SINK"; + + } + } /// diff --git a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj index e0f77911b..7046016bc 100644 --- a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj +++ b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj @@ -8,7 +8,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/operator/Synapse.Operator/Synapse.Operator.csproj b/src/operator/Synapse.Operator/Synapse.Operator.csproj index b6d786853..0374b6e36 100644 --- a/src/operator/Synapse.Operator/Synapse.Operator.csproj +++ b/src/operator/Synapse.Operator/Synapse.Operator.csproj @@ -8,7 +8,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/runner/Synapse.Runner/Configuration/RunnerCloudEventOptions.cs b/src/runner/Synapse.Runner/Configuration/RunnerCloudEventOptions.cs index 79432187f..4e69d8888 100644 --- a/src/runner/Synapse.Runner/Configuration/RunnerCloudEventOptions.cs +++ b/src/runner/Synapse.Runner/Configuration/RunnerCloudEventOptions.cs @@ -28,7 +28,9 @@ public class RunnerCloudEventOptions public RunnerCloudEventOptions() { var env = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents); - if(!string.IsNullOrWhiteSpace(env) && bool.TryParse(env, out var publishLifecycleEvents)) this.PublishLifecycleEvents = publishLifecycleEvents; + if (!string.IsNullOrWhiteSpace(env) && bool.TryParse(env, out var publishLifecycleEvents)) this.PublishLifecycleEvents = publishLifecycleEvents; + env = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.CloudEvents.Sink); + if (!string.IsNullOrWhiteSpace(env) && Uri.TryCreate(env, UriKind.RelativeOrAbsolute, out var sink)) this.Sink = sink; } /// @@ -36,4 +38,9 @@ public RunnerCloudEventOptions() /// public virtual bool PublishLifecycleEvents { get; set; } = true; + /// + /// Gets/sets the + /// + public virtual Uri? Sink { get; set; } + } diff --git a/src/runner/Synapse.Runner/Configuration/RunnerContainerOptions.cs b/src/runner/Synapse.Runner/Configuration/RunnerContainerOptions.cs index 31ad00d12..3ebe01420 100644 --- a/src/runner/Synapse.Runner/Configuration/RunnerContainerOptions.cs +++ b/src/runner/Synapse.Runner/Configuration/RunnerContainerOptions.cs @@ -29,7 +29,7 @@ public class RunnerContainerOptions public RunnerContainerOptions() { var env = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform)?.ToLowerInvariant(); - if (string.IsNullOrWhiteSpace(env)) throw new NullReferenceException("The runner's container platform must be configured"); + if (string.IsNullOrWhiteSpace(env)) env = ContainerPlatform.Docker; switch (env) { case ContainerPlatform.Docker: diff --git a/src/runner/Synapse.Runner/Configuration/RunnerExecutionMode.cs b/src/runner/Synapse.Runner/Configuration/RunnerExecutionMode.cs new file mode 100644 index 000000000..2f24e122d --- /dev/null +++ b/src/runner/Synapse.Runner/Configuration/RunnerExecutionMode.cs @@ -0,0 +1,35 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Runtime.Serialization; + +namespace Synapse.Runner.Configuration; + +/// +/// Enumerates all modes of execution for the Synapse Runner application +/// +public enum RunnerExecutionMode +{ + /// + /// The runner operates in a connected mode, interacting with the remote API. + /// In this mode, the runner depends on external services and APIs for its functionality. + /// + [EnumMember(Value = "connected")] + Connected, + /// + /// The runner operates in a standalone mode, functioning independently without relying on the remote API. + /// This mode is ideal for scenarios where external dependencies are unavailable or not required. + /// + [EnumMember(Value = "standalone")] + StandAlone +} \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Configuration/RunnerLoggingOptions.cs b/src/runner/Synapse.Runner/Configuration/RunnerLoggingOptions.cs new file mode 100644 index 000000000..942fc8a4e --- /dev/null +++ b/src/runner/Synapse.Runner/Configuration/RunnerLoggingOptions.cs @@ -0,0 +1,30 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Runtime.Serialization; + +namespace Synapse.Runner.Configuration; + +/// +/// Represents the options used to configure a Synapse Runner's logging +/// +[DataContract] +public class RunnerLoggingOptions +{ + + /// + /// Gets/sets the path to the file, if any, to output logs to + /// + public virtual string? OutputFilePath { get; set; } + +} \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Configuration/RunnerOptions.cs b/src/runner/Synapse.Runner/Configuration/RunnerOptions.cs index 94db483b8..5fc385464 100644 --- a/src/runner/Synapse.Runner/Configuration/RunnerOptions.cs +++ b/src/runner/Synapse.Runner/Configuration/RunnerOptions.cs @@ -23,6 +23,11 @@ namespace Synapse.Runner.Configuration; public class RunnerOptions { + /// + /// Gets/sets the runner's execution mode + /// + public RunnerExecutionMode ExecutionMode => string.IsNullOrWhiteSpace(this.Workflow.DefinitionFilePath) ? RunnerExecutionMode.Connected : RunnerExecutionMode.StandAlone; + /// /// Gets/sets the options used to configure the Synapse API the runner must use /// @@ -43,6 +48,11 @@ public class RunnerOptions /// public virtual RunnerContainerOptions Containers { get; set; } = new(); + /// + /// Gets/sets the options used to configure a Synapse Runner's logging + /// + public virtual RunnerLoggingOptions Logging { get; set; } = new(); + /// /// Gets the options used to configure the secrets of the Synapse Runner /// diff --git a/src/runner/Synapse.Runner/Configuration/WorkflowOptions.cs b/src/runner/Synapse.Runner/Configuration/WorkflowOptions.cs index e62a4fe51..ea994f23f 100644 --- a/src/runner/Synapse.Runner/Configuration/WorkflowOptions.cs +++ b/src/runner/Synapse.Runner/Configuration/WorkflowOptions.cs @@ -24,24 +24,52 @@ public class WorkflowOptions /// public WorkflowOptions() { - this.Instance = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Workflow.Instance)!; + this.InstanceQualifiedName = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Workflow.Instance)!; } /// - /// Gets/sets the qualified name of the workflow instance to run + /// Gets/sets the qualified name of the workflow instance to run. Required if the execution mode has been set to , otherwise ignored /// - public virtual string Instance { get; set; } + public virtual string? InstanceQualifiedName { get; set; } + + /// + /// Gets/sets the name of the definition's file, if any. Required if the execution mode has been set to , otherwise ignored + /// + public virtual string? DefinitionFilePath { get; set; } + + /// + /// Gets/sets the name of the file, if any, that defines the input of to the workflow to run. Ignored if the execution mode has been set to + /// + public virtual string? InputFilePath { get; set; } + + /// + /// Gets/sets the name of the file, if any, to write the workflow's output to. Ignored if the execution mode has been set to + /// + public virtual string? OutputFilePath { get; set; } + + /// + /// Gets/sets the workflow's output format. Ignored if execution mode has been set to + /// + public virtual WorkflowOutputFormat OutputFormat { get; set; } /// /// Gets the namespace of the workflow instance to run /// /// The namespace of the workflow instance to run - public virtual string GetInstanceNamespace() => this.Instance.Split('.', StringSplitOptions.RemoveEmptyEntries).Last(); + public virtual string GetInstanceNamespace() + { + if (string.IsNullOrWhiteSpace(this.InstanceQualifiedName)) throw new NullReferenceException("The instance qualified name is null or empty"); + return this.InstanceQualifiedName.Split('.', StringSplitOptions.RemoveEmptyEntries).Last(); + } /// /// Gets the name of the workflow instance to run /// /// The name of the workflow instance to run - public virtual string GetInstanceName() => this.Instance.Split('.', StringSplitOptions.RemoveEmptyEntries).First(); + public virtual string GetInstanceName() + { + if (string.IsNullOrWhiteSpace(this.InstanceQualifiedName)) throw new NullReferenceException("The instance qualified name is null or empty"); + return this.InstanceQualifiedName.Split('.', StringSplitOptions.RemoveEmptyEntries).First(); + } -} \ No newline at end of file +} diff --git a/src/runner/Synapse.Runner/Program.cs b/src/runner/Synapse.Runner/Program.cs index 1e225b925..292bd6632 100644 --- a/src/runner/Synapse.Runner/Program.cs +++ b/src/runner/Synapse.Runner/Program.cs @@ -11,7 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Moq; using Neuroglia.Serialization.Xml; +using NReco.Logging.File; +using ServerlessWorkflow.Sdk.IO; using Synapse; using Synapse.Core.Infrastructure.Containers; @@ -23,50 +26,59 @@ config.AddJsonFile("appsettings.json", true, true); config.AddJsonFile($"appsettings.{context.HostingEnvironment.EnvironmentName}.json", true, true); config.AddEnvironmentVariables("SYNAPSE"); - config.AddCommandLine(args); + config.AddCommandLine(args, RunnerDefaults.CommandLine.SwitchMappings); config.AddKeyPerFile("/run/secrets/synapse", true, true); }) .ConfigureServices((context, services) => { - var options = new RunnerOptions(); - context.Configuration.Bind(options); + var runnerOptions = new RunnerOptions(); + context.Configuration.Bind(runnerOptions); services.Configure(context.Configuration); - services.AddLogging(builder => + switch (runnerOptions.ExecutionMode) { - builder.AddSimpleConsole(options => - { - options.TimestampFormat = "[HH:mm:ss] "; - }); - }); - services.AddSerialization(); - services.AddJsonSerializer(options => options.DefaultBufferSize = 128); - services.AddSingleton(); - services.AddJQExpressionEvaluator(); - services.AddJavaScriptExpressionEvaluator(); - services.AddNodeJSScriptExecutor(); - services.AddPythonScriptExecutor(); - services.AddSynapseHttpApiClient(http => - { - var configuration = new ServerlessWorkflow.Sdk.Models.Authentication.OpenIDConnectSchemeDefinition() - { - Authority = options.Api.BaseAddress, - Grant = OAuth2GrantType.ClientCredentials, - Client = new() + case RunnerExecutionMode.Connected: + services.AddSynapseHttpApiClient(http => { - Id = options.ServiceAccount.Name, - Secret = options.ServiceAccount.Key - }, - Scopes = ["api"] - }; - http.BaseAddress = options.Api.BaseAddress; - http.TokenFactory = async provider => - { - var token = await provider.GetRequiredService().GetTokenAsync(configuration); - if (string.IsNullOrWhiteSpace(token.AccessToken)) throw new NullReferenceException("The access token cannot be null"); - return token.AccessToken; - }; - }); - switch (options.Containers.Platform) + var configuration = new ServerlessWorkflow.Sdk.Models.Authentication.OpenIDConnectSchemeDefinition() + { + Authority = runnerOptions.Api.BaseAddress, + Grant = OAuth2GrantType.ClientCredentials, + Client = new() + { + Id = runnerOptions.ServiceAccount.Name, + Secret = runnerOptions.ServiceAccount.Key + }, + Scopes = ["api"] + }; + http.BaseAddress = runnerOptions.Api.BaseAddress; + http.TokenFactory = async provider => + { + var token = await provider.GetRequiredService().GetTokenAsync(configuration); + if (string.IsNullOrWhiteSpace(token.AccessToken)) throw new NullReferenceException("The access token cannot be null"); + return token.AccessToken; + }; + }); + services.AddLogging(builder => + { + builder.AddSimpleConsole(options => + { + options.TimestampFormat = "[HH:mm:ss] "; + }); + }); + break; + case RunnerExecutionMode.StandAlone: + services.AddScoped(provider => new Mock() { DefaultValue = DefaultValue.Mock }.Object); + services.AddHttpClient(); + services.AddLogging(builder => + { + builder.ClearProviders(); + if(!string.IsNullOrWhiteSpace(runnerOptions.Logging.OutputFilePath)) builder.AddFile(runnerOptions.Logging.OutputFilePath); + }); + break; + default: + throw new NotSupportedException($"The specified runner execution mode '{runnerOptions.ExecutionMode}' is not supported"); + } + switch (runnerOptions.Containers.Platform) { case ContainerPlatform.Docker: services.AddDockerContainerPlatform(); @@ -74,9 +86,17 @@ case ContainerPlatform.Kubernetes: services.AddKubernetesContainerPlatform(); break; - default: - throw new NotSupportedException($"The specified container platform '{options.Containers.Platform}' is not supported"); + default: + throw new NotSupportedException($"The specified container platform '{runnerOptions.Containers.Platform}' is not supported"); } + services.AddSerialization(); + services.AddJsonSerializer(options => options.DefaultBufferSize = 128); + services.AddSingleton(); + services.AddJQExpressionEvaluator(); + services.AddJavaScriptExpressionEvaluator(); + services.AddServerlessWorkflowIO(); + services.AddNodeJSScriptExecutor(); + services.AddPythonScriptExecutor(); services.AddSingleton(); services.AddSingleton(provider => provider.GetRequiredService()); services.AddSingleton(provider => provider.GetRequiredService()); @@ -89,17 +109,13 @@ services.AddSingleton(); services.AddSingleton(); services.AddHostedService(); - - if (!options.Certificates.Validate) + if (!runnerOptions.Certificates.Validate) services.ConfigureHttpClientDefaults(httpClient => { - services.ConfigureHttpClientDefaults(httpClient => + httpClient.ConfigurePrimaryHttpMessageHandler(() => new HttpClientHandler() { - httpClient.ConfigurePrimaryHttpMessageHandler(() => new HttpClientHandler() - { - ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator - }); + ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator }); - } + }); }); using var app = builder.Build(); diff --git a/src/runner/Synapse.Runner/Properties/launchSettings.json b/src/runner/Synapse.Runner/Properties/launchSettings.json index f31c499e7..23f75d42e 100644 --- a/src/runner/Synapse.Runner/Properties/launchSettings.json +++ b/src/runner/Synapse.Runner/Properties/launchSettings.json @@ -1,7 +1,8 @@ { "profiles": { "Synapse.Runner": { - "commandName": "Project" + "commandName": "Project", + "commandLineArgs": "-w definition.yaml -i input.yaml -f yaml" }, "Container (Dockerfile)": { "commandName": "Docker" diff --git a/src/runner/Synapse.Runner/RunnerDefaults.cs b/src/runner/Synapse.Runner/RunnerDefaults.cs new file mode 100644 index 000000000..0195a86c3 --- /dev/null +++ b/src/runner/Synapse.Runner/RunnerDefaults.cs @@ -0,0 +1,47 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse; + +/// +/// Exposes constants about the Synapse Runner application +/// +public static class RunnerDefaults +{ + + /// + /// Exposes constants about the Synapse Runner command line + /// + public static class CommandLine + { + + /// + /// Gets the Runner's command line arguments switch mappings + /// + public static readonly IDictionary SwitchMappings = new Dictionary() + { + { "--workflow", $"{nameof(RunnerOptions.Workflow)}:{nameof(WorkflowOptions.DefinitionFilePath)}" }, + { "-w", $"{nameof(RunnerOptions.Workflow)}:{nameof(WorkflowOptions.DefinitionFilePath)}" }, + { "--input", $"{nameof(RunnerOptions.Workflow)}:{nameof(WorkflowOptions.InputFilePath)}" }, + { "-i", $"{nameof(RunnerOptions.Workflow)}:{nameof(WorkflowOptions.InputFilePath)}" }, + { "--format", $"{nameof(RunnerOptions.Workflow)}:{nameof(WorkflowOptions.OutputFormat)}" }, + { "-f", $"{nameof(RunnerOptions.Workflow)}:{nameof(WorkflowOptions.OutputFormat)}" }, + { "--output", $"{nameof(RunnerOptions.Workflow)}:{nameof(WorkflowOptions.OutputFilePath)}" }, + { "-o", $"{nameof(RunnerOptions.Workflow)}:{nameof(WorkflowOptions.OutputFilePath)}" }, + { "--logs", $"{nameof(RunnerOptions.Logging)}:{nameof(RunnerLoggingOptions.OutputFilePath)}" }, + { "-l", $"{nameof(RunnerOptions.Logging)}:{nameof(RunnerLoggingOptions.OutputFilePath)}" } + }; + + } + +} diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs similarity index 94% rename from src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs rename to src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs index 4b1aa0f93..501b7c4ce 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs @@ -20,27 +20,32 @@ using Synapse.Events.Tasks; using Synapse.Events.Workflows; using System.Net.Mime; -using System.Runtime.CompilerServices; namespace Synapse.Runner.Services; /// -/// Represents the default in-memory implementation of the +/// Represents a connected implementation of the interface /// /// The current +/// The service used to perform logging /// The service used to evaluate runtime expressions /// The service used to serialize/deserialize objects to/from JSON /// The service used to interact with the Synapse API /// The service used to access the current /// The of the to execute /// The to execute -public class WorkflowExecutionContext(IServiceProvider services, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, ISynapseApiClient api, IOptions options, WorkflowDefinition definition, WorkflowInstance instance) +public class ConnectedWorkflowExecutionContext(IServiceProvider services, ILogger logger, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, ISynapseApiClient api, IOptions options, WorkflowDefinition definition, WorkflowInstance instance) : IWorkflowExecutionContext { /// public IServiceProvider Services { get; } = services; + /// + /// Gets the service used to perform logging + /// + protected ILogger Logger { get; } = logger; + /// public IExpressionEvaluator Expressions { get; } = expressionEvaluator; @@ -109,14 +114,14 @@ public virtual async Task CreateTaskAsync(TaskDefinition definitio else if (!string.IsNullOrWhiteSpace(this.Instance.Status?.ContextReference)) { contextReference = this.Instance.Status?.ContextReference; - var contextDocument = await this.Api.Documents.GetAsync(this.Instance.Status!.ContextReference, cancellationToken).ConfigureAwait(false); + var contextDocument = await this.Documents.GetAsync(this.Instance.Status!.ContextReference, cancellationToken).ConfigureAwait(false); context = contextDocument?.Content.ConvertTo>() ?? new Dictionary(); } else throw new NullReferenceException($"Failed to find the data document with id '{this.Instance.Status!.ContextReference}'"); } else { - var contextDocument = await this.Api.Documents.CreateAsync($"{reference}/input", context, cancellationToken).ConfigureAwait(false); + var contextDocument = await this.Documents.CreateAsync($"{reference}/input", context, cancellationToken).ConfigureAwait(false); contextReference = contextDocument.Id; } var filteredInput = input; @@ -128,7 +133,7 @@ public virtual async Task CreateTaskAsync(TaskDefinition definitio }; if (definition.Input?.From is string fromExpression) filteredInput = (await this.Expressions.EvaluateAsync(fromExpression, input, evaluationArguments, cancellationToken).ConfigureAwait(false))!; else if (definition.Input?.From != null) filteredInput = (await this.Expressions.EvaluateAsync(definition.Input.From, input, evaluationArguments, cancellationToken).ConfigureAwait(false))!; - var inputDocument = await this.Api.Documents.CreateAsync($"{reference}/input", filteredInput, cancellationToken).ConfigureAwait(false); + var inputDocument = await this.Documents.CreateAsync($"{reference}/input", filteredInput, cancellationToken).ConfigureAwait(false); var task = new TaskInstance() { Name = name, @@ -159,18 +164,20 @@ public virtual async Task CreateTaskAsync(TaskDefinition definitio CreatedAt = task.CreatedAt } }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Task '{reference}' created.", reference); return task; } /// public virtual async Task InitializeAsync(CancellationToken cancellationToken = default) { - var document = await this.Api.Documents.CreateAsync(this.Instance.GetQualifiedName(), this.Instance.Spec.Input ?? [], cancellationToken).ConfigureAwait(false); + var document = await this.Documents.CreateAsync(this.Instance.GetQualifiedName(), this.Instance.Spec.Input ?? [], cancellationToken).ConfigureAwait(false); var jsonPatch = new JsonPatch(PatchOperation.Add(JsonPointer.Create(w => w.Status!).ToCamelCase(), this.JsonSerializer.SerializeToNode(new WorkflowInstanceStatus() { ContextReference = document.Id }))); this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Workflow initialized."); } /// @@ -178,6 +185,7 @@ public virtual Task InitializeAsync(TaskInstance task, Cancellatio { //task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); //task.Status = TaskInstanceStatus.Initializing; + this.Logger.LogInformation("Task '{reference}' initialized.", task.Reference); return Task.FromResult(task); } @@ -230,6 +238,7 @@ public virtual async Task StartAsync(CancellationToken cancellationToken = defau StartedAt = this.Instance.Status?.StartedAt ?? DateTimeOffset.Now } }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Workflow started."); } /// @@ -260,6 +269,7 @@ public virtual async Task StartAsync(TaskInstance task, Cancellati StartedAt = task.StartedAt ?? DateTimeOffset.Now } }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Task '{reference}' started.", task.Reference); return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); } @@ -275,7 +285,7 @@ public virtual async Task ResumeAsync(CancellationToken cancellationToken = defa this.Instance.Status.Runs.Add(new() { StartedAt = DateTimeOffset.Now }); var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(originalInstance, this.Instance); this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false); - this.ContextData = (await this.Api.Documents.GetAsync(this.Instance.Status!.ContextReference, cancellationToken).ConfigureAwait(false)).Content.ConvertTo>()!; + this.ContextData = (await this.Documents.GetAsync(this.Instance.Status!.ContextReference, cancellationToken).ConfigureAwait(false)).Content.ConvertTo>()!; if (this.Options.CloudEvents.PublishLifecycleEvents) await this.Api.Events.PublishAsync(new CloudEvent() { SpecVersion = CloudEventSpecVersion.V1.Version, @@ -291,6 +301,7 @@ public virtual async Task ResumeAsync(CancellationToken cancellationToken = defa ResumedAt = this.Instance.Status?.Runs?.LastOrDefault()?.StartedAt ?? DateTimeOffset.Now } }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("The workflow's execution has been resumed."); } /// @@ -406,6 +417,7 @@ public virtual async Task RetryAsync(TaskInstance task, Error caus ArgumentNullException.ThrowIfNull(task); ArgumentNullException.ThrowIfNull(cause); using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Retrying task '{reference}'...", task.Reference); var originalInstance = this.Instance.Clone(); task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); task.Retries ??= []; @@ -559,7 +571,7 @@ public virtual async Task SetResultAsync(object? result, CancellationToken cance this.Instance.Status ??= new(); this.Instance.Status.Phase = WorkflowInstanceStatusPhase.Completed; this.Instance.Status.EndedAt = DateTimeOffset.Now; - var document = await this.Api.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/output", result, cancellationToken).ConfigureAwait(false); + var document = await this.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/output", result, cancellationToken).ConfigureAwait(false); this.Instance.Status.OutputReference = document.Id; this.Output = document.Content; var run = this.Instance.Status.Runs?.LastOrDefault(); @@ -602,6 +614,7 @@ await this.Api.Events.PublishAsync(new CloudEvent() } }, cancellationToken).ConfigureAwait(false); } + this.Logger.LogInformation("Workflow successfully executed."); } /// @@ -614,7 +627,7 @@ public virtual async Task SetResultAsync(TaskInstance task, object task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); task.Status = TaskInstanceStatus.Completed; task.EndedAt = DateTimeOffset.Now; - var document = await this.Api.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/{task.Reference}/output", result, cancellationToken).ConfigureAwait(false); + var document = await this.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/{task.Reference}/output", result, cancellationToken).ConfigureAwait(false); task.OutputReference = document.Id; task.Next = then; var run = task.Runs!.Last(); @@ -658,6 +671,7 @@ await this.Api.Events.PublishAsync(new CloudEvent() } }, cancellationToken).ConfigureAwait(false); } + this.Logger.LogInformation("Task '{reference}' successfully executed.", task.Reference); return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); } @@ -667,7 +681,7 @@ public virtual async Task SkipAsync(TaskInstance task, object? res ArgumentNullException.ThrowIfNull(task); using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); var originalInstance = this.Instance.Clone(); - var document = await this.Api.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/{task.Reference}/output", result ?? new(), cancellationToken).ConfigureAwait(false); + var document = await this.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/{task.Reference}/output", result ?? new(), cancellationToken).ConfigureAwait(false); result ??= new(); task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); task.Status = TaskInstanceStatus.Skipped; @@ -692,6 +706,7 @@ public virtual async Task SkipAsync(TaskInstance task, object? res SkippedAt = task.EndedAt ?? DateTimeOffset.Now } }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("The execution of task '{reference}' has been skipped.", task.Reference); return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); } @@ -722,6 +737,7 @@ public virtual async Task SuspendAsync(CancellationToken cancellationToken = def SuspendedAt = run?.EndedAt ?? DateTimeOffset.Now } }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("The workflow's execution has been suspended."); } /// @@ -753,6 +769,7 @@ public virtual async Task SuspendAsync(TaskInstance task, Cancella SuspendedAt = run?.EndedAt ?? DateTimeOffset.Now } }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("The execution of task '{reference}' has been suspended.", task.Reference); return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); } @@ -804,6 +821,7 @@ await this.Api.Events.PublishAsync(new CloudEvent() } }, cancellationToken).ConfigureAwait(false); } + this.Logger.LogInformation("The workflow's execution cancelled."); } /// @@ -856,6 +874,7 @@ await this.Api.Events.PublishAsync(new CloudEvent() } }, cancellationToken).ConfigureAwait(false); } + this.Logger.LogInformation("The execution of task '{reference}' has been cancelled.", task.Reference); return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); } @@ -887,4 +906,4 @@ protected virtual async Task EndAsync(CancellationToken cancellationToken) } } -} +} \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Services/MemoryDocumentManager.cs b/src/runner/Synapse.Runner/Services/MemoryDocumentManager.cs new file mode 100644 index 000000000..1264a29e7 --- /dev/null +++ b/src/runner/Synapse.Runner/Services/MemoryDocumentManager.cs @@ -0,0 +1,67 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Runner.Services; + +/// +/// Represents an in-memory implementation of the interface +/// +public class MemoryDocumentManager + : IDocumentApiClient +{ + + /// + /// Gets a key/value mapping of all documents stored in memory + /// + protected IDictionary Documents { get; } = new Dictionary(); + + /// + public virtual Task CreateAsync(string name, object content, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentNullException.ThrowIfNull(content); + var document = new Document() + { + Name = name, + Content = content + }; + this.Documents[document.Id] = document; + return Task.FromResult(document); + } + + /// + public virtual Task GetAsync(string id, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(id); + return Task.FromResult(this.Documents[id]); + } + + /// + public virtual Task UpdateAsync(string id, object content, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(id); + ArgumentNullException.ThrowIfNull(content); + var document = this.Documents[id]; + document.Content = content; + return Task.CompletedTask; + } + + /// + public virtual Task DeleteAsync(string id, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(id); + this.Documents.Remove(id); + return Task.CompletedTask; + } + +} \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Services/RunnerApplication.cs b/src/runner/Synapse.Runner/Services/RunnerApplication.cs index 1a7f9ec2d..1c00912a9 100644 --- a/src/runner/Synapse.Runner/Services/RunnerApplication.cs +++ b/src/runner/Synapse.Runner/Services/RunnerApplication.cs @@ -11,7 +11,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +using MimeKit; +using Neuroglia; using Neuroglia.Data.Infrastructure.ResourceOriented; +using ServerlessWorkflow.Sdk.IO; +using System.Net.Http.Headers; +using System.Net.Mime; namespace Synapse.Runner.Services; @@ -53,6 +58,21 @@ internal class RunnerApplication(IServiceProvider serviceProvider, IHostApplicat /// protected ISynapseApiClient ApiClient => this.ServiceProvider.GetRequiredService(); + /// + /// Gets the service used to read s + /// + protected IWorkflowDefinitionReader WorkflowDefinitionReader => this.ServiceProvider.GetRequiredService(); + + /// + /// Gets the service used to serialize/deserialize data to/from JSON + /// + protected IJsonSerializer JsonSerializer => this.ServiceProvider.GetRequiredService(); + + /// + /// Gets the service used to serialize/deserialize data to/from YAML + /// + protected IYamlSerializer YamlSerializer => this.ServiceProvider.GetRequiredService(); + /// /// Gets the service used to access the current /// @@ -87,15 +107,68 @@ protected virtual async Task RunAsync(CancellationToken cancellationToken) { try { - if (string.IsNullOrWhiteSpace(this.Options.Workflow?.Instance)) throw new NullReferenceException("The workflow instance to run must be configured, which can be done using the application's appsettings.json file, using command line arguments or using environment variables"); - var instance = await this.ApiClient.WorkflowInstances.GetAsync(this.Options.Workflow.GetInstanceName(), this.Options.Workflow.GetInstanceNamespace(), cancellationToken).ConfigureAwait(false) ?? throw new NullReferenceException($"Failed to find the specified workflow instance '{this.Options.Workflow.Instance}'"); - var resource = await this.ApiClient.Workflows.GetAsync(instance.Spec.Definition.Name, instance.Spec.Definition.Namespace, cancellationToken).ConfigureAwait(false) ?? throw new NullReferenceException($"Failed to find the specified workflow '{instance.Spec.Definition.Namespace}.{instance.Spec.Definition.Name}'"); - var definition = resource.Spec.Versions.FirstOrDefault(v => v.Document.Version == instance.Spec.Definition.Version) ?? throw new NullReferenceException($"Failed to find the specified version '{instance.Spec.Definition.Version}' of the workflow '{instance.Spec.Definition.Namespace}.{instance.Spec.Definition.Name}'"); + WorkflowDefinition definition; + WorkflowInstance instance; + Type executionContextType; + switch (this.Options.ExecutionMode) + { + case RunnerExecutionMode.Connected: + if (string.IsNullOrWhiteSpace(this.Options.Workflow.InstanceQualifiedName)) throw new NullReferenceException("The workflow instance to run must be configured, which can be done using the application's appsettings.json file, using command line arguments or using environment variables"); + instance = await this.ApiClient.WorkflowInstances.GetAsync(this.Options.Workflow.GetInstanceName(), this.Options.Workflow.GetInstanceNamespace(), cancellationToken).ConfigureAwait(false) ?? throw new NullReferenceException($"Failed to find the specified workflow instance '{this.Options.Workflow.InstanceQualifiedName}'"); + var resource = await this.ApiClient.Workflows.GetAsync(instance.Spec.Definition.Name, instance.Spec.Definition.Namespace, cancellationToken).ConfigureAwait(false) ?? throw new NullReferenceException($"Failed to find the specified workflow '{instance.Spec.Definition.Namespace}.{instance.Spec.Definition.Name}'"); + definition = resource.Spec.Versions.FirstOrDefault(v => v.Document.Version == instance.Spec.Definition.Version) ?? throw new NullReferenceException($"Failed to find the specified version '{instance.Spec.Definition.Version}' of the workflow '{instance.Spec.Definition.Namespace}.{instance.Spec.Definition.Name}'"); + executionContextType = typeof(ConnectedWorkflowExecutionContext); + break; + case RunnerExecutionMode.StandAlone: + if (string.IsNullOrWhiteSpace(this.Options.Workflow.DefinitionFilePath)) throw new NullReferenceException("The path to the workflow definition file must be set in stand-alone execution mode"); + if (!File.Exists(this.Options.Workflow.DefinitionFilePath)) throw new FileNotFoundException("The workflow definition file does not exist or cannot be found", this.Options.Workflow.DefinitionFilePath); + var stream = new FileStream(this.Options.Workflow.DefinitionFilePath, FileMode.Open); + var readerOptions = new WorkflowDefinitionReaderOptions(); + definition = await this.WorkflowDefinitionReader.ReadAsync(stream, readerOptions, cancellationToken).ConfigureAwait(false); + await stream.DisposeAsync().ConfigureAwait(false); + IDictionary? input = null; + if (!string.IsNullOrWhiteSpace(this.Options.Workflow.InputFilePath)) + { + var inputFile = new FileInfo(this.Options.Workflow.InputFilePath); + if (!inputFile.Exists) throw new FileNotFoundException("The workflow input file does not exist or cannot be found", this.Options.Workflow.InputFilePath); + var extension = inputFile.Extension.ToLowerInvariant().Split('.', StringSplitOptions.RemoveEmptyEntries).Last(); + stream = inputFile.OpenRead(); + input = extension switch + { + "json" => this.JsonSerializer.Deserialize>(stream), + "yaml" or "yml" => this.YamlSerializer.Deserialize>(stream), + _ => throw new NotSupportedException($"The workflow input file extension '{extension}' is not supported. Supported extensions are '.json', '.yaml' and '.yml'"), + }; + await stream.DisposeAsync().ConfigureAwait(false); + } + instance = new WorkflowInstance() + { + Metadata = new() + { + Namespace = definition.Document.Namespace, + Name = $"{definition.Document.Name}-{Guid.NewGuid().ToShortString()}" + }, + Spec = new() + { + Definition = new() + { + Namespace = definition.Document.Namespace, + Name = definition.Document.Name, + Version = definition.Document.Version + }, + Input = input == null ? null : new(input) + } + }; + executionContextType = typeof(StandAloneWorkflowExecutionContext); + break; + default: + throw new NotSupportedException($"The specified runner execution mode '{this.Options.ExecutionMode}' is not supported"); + } var expressionLanguage = definition.Evaluate?.Language ?? RuntimeExpressions.Languages.JQ; var expressionEvaluator = this.ServiceProvider.GetRequiredService().GetEvaluator(expressionLanguage) ?? throw new NullReferenceException($"Failed to find an expression evaluator for the language '{expressionLanguage}' defined by workflow '{instance.Spec.Definition.Namespace}.{instance.Spec.Definition.Name}:{instance.Spec.Definition.Version}'"); - var context = ActivatorUtilities.CreateInstance(this.ServiceProvider, expressionEvaluator, definition, instance); - this.Events = this.ApiClient.WorkflowInstances.MonitorAsync(this.Options.Workflow.GetInstanceName(), this.Options.Workflow.GetInstanceNamespace(), cancellationToken).ToObservable(); + var context = (IWorkflowExecutionContext)ActivatorUtilities.CreateInstance(this.ServiceProvider, executionContextType, expressionEvaluator, definition, instance); + this.Events = this.ApiClient.WorkflowInstances.MonitorAsync(instance.Metadata.Name!, instance.Metadata.Namespace!, cancellationToken).ToObservable(); this.Events .Where(e => e.Type == ResourceWatchEventType.Updated && e.Resource.Status?.Phase != context.Instance.Status?.Phase) .Select(e => e.Resource.Status?.Phase) diff --git a/src/runner/Synapse.Runner/Services/StandAloneWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/StandAloneWorkflowExecutionContext.cs new file mode 100644 index 000000000..39e5b7cc9 --- /dev/null +++ b/src/runner/Synapse.Runner/Services/StandAloneWorkflowExecutionContext.cs @@ -0,0 +1,768 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Json.Pointer; +using Moq; +using Neuroglia; +using Neuroglia.Data.Expressions; +using Neuroglia.Data.Infrastructure.ResourceOriented; +using Synapse.Events.Tasks; +using Synapse.Events.Workflows; +using System.Net.Mime; +using System.Text; +using System.Text.Json; + +namespace Synapse.Runner.Services; + +/// +/// Represents a stand-alone, in-memory implementation of the interface +/// +/// The current +/// The service used to perform logging +/// The service used to evaluate runtime expressions +/// The service used to serialize/deserialize data to/from JSON +/// The service used to serialize/deserialize data to/from YAML +/// The service used to access the current +/// The service used to perform HTTP requests +/// The of the to execute +/// The to execute +public class StandAloneWorkflowExecutionContext(IServiceProvider services, ILogger logger, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, IYamlSerializer yamlSerializer, IOptions options, HttpClient httpClient, WorkflowDefinition definition, WorkflowInstance instance) + : IWorkflowExecutionContext +{ + + readonly JsonSerializerOptions _jsonSerializerOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + WriteIndented = true + }; + + /// + public IServiceProvider Services { get; } = services; + + /// + /// Gets the service used to perform logging + /// + protected ILogger Logger { get; } = logger; + + /// + public IExpressionEvaluator Expressions { get; } = expressionEvaluator; + + /// + /// Gets the service used to serialize/deserialize data to/from JSON + /// + protected IJsonSerializer JsonSerializer { get; } = jsonSerializer; + + /// + /// Gets the service used to serialize/deserialize data to/from YAML + /// + protected IYamlSerializer YamlSerializer { get; } = yamlSerializer; + + /// + /// Gets the service used to perform HTTP requests + /// + protected HttpClient HttpClient { get; } = httpClient; + + /// + /// Gets the current + /// + protected RunnerOptions Options { get; } = options.Value; + + /// + public WorkflowDefinition Definition { get; } = definition; + + /// + public WorkflowInstance Instance { get; set; } = instance; + + /// + public IDocumentApiClient Documents { get; } = new MemoryDocumentManager(); + + /// + public IClusterResourceApiClient CustomFunctions => Mock.Of>(); + + /// + /// Gets the object used to asynchronously lock the + /// + protected AsyncLock Lock { get; } = new(); + + /// + public IDictionary ContextData { get; protected set; } = new Dictionary(); + + /// + public IDictionary Arguments { get; protected set; } = new Dictionary(); + + /// + public object? Output { get; protected set; } + + /// + public virtual Task ContinueWithAsync(TaskDefinition task, CancellationToken cancellationToken = default) => Task.CompletedTask; + + /// + public virtual async Task CreateTaskAsync(TaskDefinition definition, string? path, object input, IDictionary? context = null, ITaskExecutionContext? parent = null, bool isExtension = false, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(definition); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + input ??= new(); + var name = string.IsNullOrWhiteSpace(path) + ? parent?.Instance.Reference?.OriginalString.Split('/', StringSplitOptions.RemoveEmptyEntries).Last() + : path.Split('/', StringSplitOptions.RemoveEmptyEntries).Last(); + var reference = this.Definition.BuildReferenceTo(definition, path, parent?.Instance.Reference); + var contextReference = string.Empty; + if (context == null) + { + if (parent?.ContextData != null) + { + contextReference = parent.Instance.ContextReference; + context = parent.ContextData; + } + else if (!string.IsNullOrWhiteSpace(this.Instance.Status?.ContextReference)) + { + contextReference = this.Instance.Status?.ContextReference; + var contextDocument = await this.Documents.GetAsync(this.Instance.Status!.ContextReference, cancellationToken).ConfigureAwait(false); + context = contextDocument?.Content.ConvertTo>() ?? new Dictionary(); + } + else throw new NullReferenceException($"Failed to find the data document with id '{this.Instance.Status!.ContextReference}'"); + } + else + { + var contextDocument = await this.Documents.CreateAsync($"{reference}/input", context, cancellationToken).ConfigureAwait(false); + contextReference = contextDocument.Id; + } + var filteredInput = input; + var evaluationArguments = new Dictionary() + { + { RuntimeExpressions.Arguments.Runtime, RuntimeDescriptor.Current }, + { RuntimeExpressions.Arguments.Workflow, this.GetDescriptor() }, + { RuntimeExpressions.Arguments.Context, context } + }; + if (definition.Input?.From is string fromExpression) filteredInput = (await this.Expressions.EvaluateAsync(fromExpression, input, evaluationArguments, cancellationToken).ConfigureAwait(false))!; + else if (definition.Input?.From != null) filteredInput = (await this.Expressions.EvaluateAsync(definition.Input.From, input, evaluationArguments, cancellationToken).ConfigureAwait(false))!; + var inputDocument = await this.Documents.CreateAsync($"{reference}/input", filteredInput, cancellationToken).ConfigureAwait(false); + var task = new TaskInstance() + { + Name = name, + Reference = reference, + IsExtension = isExtension, + ParentId = parent?.Instance.Id, + InputReference = inputDocument.Id, + ContextReference = contextReference + }; + var pointer = JsonPointer.Create(w => w.Status!.Tasks!).ToCamelCase(); + if (this.Instance.Status?.Tasks != null) pointer = JsonPointer.Parse($"{pointer}/-"); + this.Instance.Status ??= new(); + this.Instance.Status.Tasks ??= []; + this.Instance.Status.Tasks.Add(task); + if (this.Options.CloudEvents.PublishLifecycleEvents) await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Created.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskCreatedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + CreatedAt = task.CreatedAt + } + }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Task '{reference}' created.", reference); + return task; + } + + /// + public virtual async Task InitializeAsync(CancellationToken cancellationToken = default) + { + var document = await this.Documents.CreateAsync(this.Instance.GetQualifiedName(), this.Instance.Spec.Input ?? [], cancellationToken).ConfigureAwait(false); + this.Instance.Status = new() + { + ContextReference = document.Id + }; + this.Logger.LogInformation("Workflow initialized."); + } + + /// + public virtual Task InitializeAsync(TaskInstance task, CancellationToken cancellationToken = default) + { + //task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + //task.Status = TaskInstanceStatus.Initializing; + this.Logger.LogInformation("Task '{reference}' initialized.", task.Reference); + return Task.FromResult(task); + } + + /// + public virtual async Task StartAsync(CancellationToken cancellationToken = default) + { + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Instance.Status ??= new(); + this.Instance.Status.Phase = WorkflowInstanceStatusPhase.Running; + this.Instance.Status.StartedAt ??= DateTimeOffset.Now; + this.Instance.Status.Runs ??= []; + this.Instance.Status.Runs.Add(new() { StartedAt = DateTimeOffset.Now }); + this.Instance.Status.ContextReference ??= (await this.Documents.CreateAsync(this.Instance.GetQualifiedName(), this.ContextData, cancellationToken).ConfigureAwait(false)).Id; + if (this.Options.CloudEvents.PublishLifecycleEvents) await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Started.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowStartedEventV1() + { + Name = this.Instance.GetQualifiedName(), + Definition = this.Instance.Spec.Definition, + StartedAt = this.Instance.Status?.StartedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Workflow started."); + } + + /// + public virtual async Task StartAsync(TaskInstance task, CancellationToken cancellationToken = default) + { + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + task.Status = TaskInstanceStatus.Running; + task.StartedAt ??= DateTimeOffset.Now; + task.Runs ??= []; + task.Runs.Add(new() { StartedAt = DateTimeOffset.Now }); + if (this.Options.CloudEvents.PublishLifecycleEvents) await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Started.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskStartedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + StartedAt = task.StartedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Task '{reference}' started.", task.Reference); + return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + } + + /// + public virtual async Task ResumeAsync(CancellationToken cancellationToken = default) + { + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Instance.Status ??= new(); + this.Instance.Status.Phase = WorkflowInstanceStatusPhase.Running; + this.Instance.Status.StartedAt ??= DateTimeOffset.Now; + this.Instance.Status.Runs ??= []; + this.Instance.Status.Runs.Add(new() { StartedAt = DateTimeOffset.Now }); + this.ContextData = (await this.Documents.GetAsync(this.Instance.Status!.ContextReference, cancellationToken).ConfigureAwait(false)).Content.ConvertTo>()!; + if (this.Options.CloudEvents.PublishLifecycleEvents) await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Resumed.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowResumedEventV1() + { + Name = this.Instance.GetQualifiedName(), + ResumedAt = this.Instance.Status?.Runs?.LastOrDefault()?.StartedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("The workflow's execution has been resumed."); + } + + /// + public virtual Task CorrelateAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default) => throw new NotSupportedException("Event correlation is not supported in stand-alone execution mode"); + + /// + public virtual async Task PublishAsync(CloudEvent e, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(e); + if (this.Options.CloudEvents.Sink == null) return; + try + { + var json = this.JsonSerializer.SerializeToText(e); + using var request = new HttpRequestMessage(HttpMethod.Post, this.Options.CloudEvents.Sink) + { + Content = new StringContent(json, Encoding.UTF8, CloudEventContentType.Json) + }; + using var response = await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); + response.EnsureSuccessStatusCode(); + } + catch (Exception ex) + { + this.Logger.LogError("An error occurred while publishing a cloud event to the configured sink: {ex}", ex); + } + } + + /// + public virtual IAsyncEnumerable GetTasksAsync(CancellationToken cancellationToken = default) => (this.Instance.Status?.Tasks ?? []).ToAsyncEnumerable(); + + /// + public virtual IAsyncEnumerable GetTasksAsync(TaskInstance task, CancellationToken cancellationToken = default) => this.GetTasksAsync(cancellationToken).Where(t => t.ParentId == task.Id); + + /// + public virtual async Task RetryAsync(TaskInstance task, Error cause, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(cause); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Retrying task '{reference}'...", task.Reference); + task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + task.Retries ??= []; + task.Retries.Add(new RetryAttempt() + { + Number = (uint)task.Retries.Count + 1, + Cause = cause + }); + task.Status = TaskInstanceStatus.Running; + task.StartedAt ??= DateTimeOffset.Now; + task.Runs ??= []; + task.Runs.Add(new() { StartedAt = DateTimeOffset.Now }); + if (this.Options.CloudEvents.PublishLifecycleEvents) await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Retrying.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new RetryingTaskEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + RetryingAt = task.Runs?.LastOrDefault()?.StartedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + } + + /// + public virtual async Task SetErrorAsync(Error error, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(error); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Instance.Status ??= new(); + this.Instance.Status.Phase = WorkflowInstanceStatusPhase.Faulted; + this.Instance.Status.EndedAt = DateTimeOffset.Now; + this.Instance.Status.Error = error; + var run = this.Instance.Status.Runs?.LastOrDefault(); + if (run != null) run.EndedAt = DateTimeOffset.Now; + if (this.Options.CloudEvents.PublishLifecycleEvents) + { + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Faulted.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowFaultedEventV1() + { + Name = this.Instance.GetQualifiedName(), + Error = error, + FaultedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Ended.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowEndedEventV1() + { + Name = this.Instance.GetQualifiedName(), + Status = this.Instance.Status!.Phase!, + EndedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + } + } + + /// + public virtual async Task SetErrorAsync(TaskInstance task, Error error, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(task); + ArgumentNullException.ThrowIfNull(error); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + task.Status = TaskInstanceStatus.Faulted; + task.EndedAt = DateTimeOffset.Now; + task.Error = error; + var run = task.Runs?.LastOrDefault(); + if (run != null) + { + run.EndedAt = DateTimeOffset.Now; + run.Outcome = task.Status; + } + if (this.Options.CloudEvents.PublishLifecycleEvents) + { + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Faulted.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskFaultedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + Error = error, + FaultedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Ended.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskEndedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + Status = task.Status!, + EndedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + } + return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + } + + /// + public virtual async Task SetResultAsync(object? result, CancellationToken cancellationToken = default) + { + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + result ??= new(); + this.Instance.Status ??= new(); + this.Instance.Status.Phase = WorkflowInstanceStatusPhase.Completed; + this.Instance.Status.EndedAt = DateTimeOffset.Now; + var document = await this.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/output", result, cancellationToken).ConfigureAwait(false); + this.Instance.Status.OutputReference = document.Id; + this.Output = result; + var run = this.Instance.Status.Runs?.LastOrDefault(); + if (run != null) run.EndedAt = DateTimeOffset.Now; + if (this.Options.CloudEvents.PublishLifecycleEvents) + { + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Completed.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowCompletedEventV1() + { + Name = this.Instance.GetQualifiedName(), + CompletedAt = run?.EndedAt ?? DateTimeOffset.Now, + Output = this.Output + } + }, cancellationToken).ConfigureAwait(false); + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Ended.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowEndedEventV1() + { + Name = this.Instance.GetQualifiedName(), + Status = this.Instance.Status!.Phase!, + EndedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + } + var output = this.Options.Workflow.OutputFormat switch + { + WorkflowOutputFormat.Json => System.Text.Json.JsonSerializer.Serialize(this.Output, this._jsonSerializerOptions), + WorkflowOutputFormat.Yaml => this.YamlSerializer.SerializeToText(this.Output), + _ => throw new NotSupportedException($"The specified workflow output format '{this.Options.Workflow.OutputFormat}' is not supported"), + }; + if (string.IsNullOrWhiteSpace(this.Options.Workflow.OutputFilePath)) Console.WriteLine(output); + else await File.WriteAllTextAsync(this.Options.Workflow.OutputFilePath, output, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Workflow successfully executed."); + } + + /// + public virtual async Task SetResultAsync(TaskInstance task, object? result, string? then = FlowDirective.Continue, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(task); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + result ??= new(); + task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + task.Status = TaskInstanceStatus.Completed; + task.EndedAt = DateTimeOffset.Now; + var document = await this.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/{task.Reference}/output", result, cancellationToken).ConfigureAwait(false); + task.OutputReference = document.Id; + task.Next = then; + var run = task.Runs!.Last(); + run.EndedAt = DateTimeOffset.Now; + run.Outcome = task.Status; + if (this.Options.CloudEvents.PublishLifecycleEvents) + { + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Completed.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskCompletedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + CompletedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Ended.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskEndedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + Status = this.Instance.Status!.Phase!, + EndedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + } + var output = this.Options.Workflow.OutputFormat switch + { + WorkflowOutputFormat.Json => System.Text.Json.JsonSerializer.Serialize(result, this._jsonSerializerOptions), + WorkflowOutputFormat.Yaml => this.YamlSerializer.SerializeToText(result), + _ => throw new NotSupportedException($"The specified workflow output format '{this.Options.Workflow.OutputFormat}' is not supported"), + }; + this.Logger.LogInformation("Task '{reference}' successfully executed.", task.Reference); + return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + } + + /// + public virtual async Task SkipAsync(TaskInstance task, object? result, string? then = FlowDirective.Continue, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(task); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + var document = await this.Documents.CreateAsync($"{this.Instance.GetQualifiedName()}/{task.Reference}/output", result ?? new(), cancellationToken).ConfigureAwait(false); + result ??= new(); + task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + task.Status = TaskInstanceStatus.Skipped; + task.EndedAt = DateTimeOffset.Now; + task.OutputReference = document.Id; + task.Next = then; + if (this.Options.CloudEvents.PublishLifecycleEvents) await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Skipped.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskSkippedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + SkippedAt = task.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("The execution of task '{reference}' has been skipped.", task.Reference); + return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + } + + /// + public virtual async Task SuspendAsync(CancellationToken cancellationToken = default) + { + if (this.Instance.Status?.Phase == WorkflowInstanceStatusPhase.Suspended) return; + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Instance.Status ??= new(); + this.Instance.Status.Phase = WorkflowInstanceStatusPhase.Suspended; + var run = this.Instance.Status.Runs?.LastOrDefault(); + if (run != null) run.EndedAt = DateTimeOffset.Now; + if (this.Options.CloudEvents.PublishLifecycleEvents) await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Suspended.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowSuspendedEventV1() + { + Name = this.Instance.GetQualifiedName(), + SuspendedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("The workflow's execution has been suspended."); + } + + /// + public virtual async Task SuspendAsync(TaskInstance task, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(task); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + task.Status = TaskInstanceStatus.Suspended; + var run = task.Runs!.Last(); + run.EndedAt = DateTimeOffset.Now; + run.Outcome = task.Status; + if (this.Options.CloudEvents.PublishLifecycleEvents) await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Suspended.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskSuspendedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + SuspendedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("The execution of task '{reference}' has been suspended.", task.Reference); + return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + } + + /// + public virtual async Task CancelAsync(CancellationToken cancellationToken = default) + { + if (this.Instance.Status?.Phase == WorkflowInstanceStatusPhase.Cancelled) return; + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Instance.Status ??= new(); + this.Instance.Status.Phase = WorkflowInstanceStatusPhase.Cancelled; + this.Instance.Status.EndedAt = DateTimeOffset.Now; + var run = this.Instance.Status.Runs?.LastOrDefault(); + if (run != null) run.EndedAt = DateTimeOffset.Now; + if (this.Options.CloudEvents.PublishLifecycleEvents) + { + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Cancelled.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowCancelledEventV1() + { + Name = this.Instance.GetQualifiedName(), + CancelledAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Workflow.Ended.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new WorkflowEndedEventV1() + { + Name = this.Instance.GetQualifiedName(), + Status = this.Instance.Status!.Phase!, + EndedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + } + this.Logger.LogInformation("The workflow's execution cancelled."); + } + + /// + public virtual async Task CancelAsync(TaskInstance task, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(task); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + task = this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + task.Status = TaskInstanceStatus.Cancelled; + task.EndedAt = DateTimeOffset.Now; + var run = task.Runs!.Last(); + run.EndedAt = DateTimeOffset.Now; + run.Outcome = task.Status; + if (this.Options.CloudEvents.PublishLifecycleEvents) + { + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Cancelled.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskCancelledEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + CancelledAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + await this.PublishAsync(new CloudEvent() + { + SpecVersion = CloudEventSpecVersion.V1.Version, + Id = Guid.NewGuid().ToString(), + Time = DateTimeOffset.Now, + Source = this.Options.Api.BaseAddress, + Type = SynapseDefaults.CloudEvents.Task.Ended.v1, + Subject = this.Instance.GetQualifiedName(), + DataContentType = MediaTypeNames.Application.Json, + Data = new TaskEndedEventV1() + { + Workflow = this.Instance.GetQualifiedName(), + Task = task.Reference, + Status = this.Instance.Status!.Phase!, + EndedAt = run?.EndedAt ?? DateTimeOffset.Now + } + }, cancellationToken).ConfigureAwait(false); + } + this.Logger.LogInformation("The execution of task '{reference}' has been cancelled.", task.Reference); + return this.Instance.Status?.Tasks?.FirstOrDefault(t => t.Id == task.Id) ?? throw new NullReferenceException($"Failed to find the task instance with the specified id '{task.Id}'. Make sure the task instance resource has been created using the workflow context."); + } + +} diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index 3ef64dc1b..10b855089 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -8,7 +8,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -60,11 +60,13 @@ + + diff --git a/src/runner/Synapse.Runner/WorkflowOutputFormat.cs b/src/runner/Synapse.Runner/WorkflowOutputFormat.cs new file mode 100644 index 000000000..b87090543 --- /dev/null +++ b/src/runner/Synapse.Runner/WorkflowOutputFormat.cs @@ -0,0 +1,33 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Runtime.Serialization; + +namespace Synapse.Runner; + +/// +/// Exposes all supported workflow output formats +/// +public enum WorkflowOutputFormat +{ + /// + /// Indicates that the workflow output should be formatted to JSON + /// + [EnumMember(Value = "json")] + Json, + /// + /// Indicates that the workflow output should be formatted to YAML + /// + [EnumMember(Value = "json")] + Yaml +} \ No newline at end of file diff --git a/src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj b/src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj index b1ed12879..2494eadc6 100644 --- a/src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj +++ b/src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/runtime/Synapse.Runtime.Docker/Synapse.Runtime.Docker.csproj b/src/runtime/Synapse.Runtime.Docker/Synapse.Runtime.Docker.csproj index b8d1d669a..ec2a3dab1 100644 --- a/src/runtime/Synapse.Runtime.Docker/Synapse.Runtime.Docker.csproj +++ b/src/runtime/Synapse.Runtime.Docker/Synapse.Runtime.Docker.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/runtime/Synapse.Runtime.Kubernetes/Synapse.Runtime.Kubernetes.csproj b/src/runtime/Synapse.Runtime.Kubernetes/Synapse.Runtime.Kubernetes.csproj index cdb1a5fdd..60e3a0896 100644 --- a/src/runtime/Synapse.Runtime.Kubernetes/Synapse.Runtime.Kubernetes.csproj +++ b/src/runtime/Synapse.Runtime.Kubernetes/Synapse.Runtime.Kubernetes.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/runtime/Synapse.Runtime.Native/Synapse.Runtime.Native.csproj b/src/runtime/Synapse.Runtime.Native/Synapse.Runtime.Native.csproj index 7cbfac499..a35678e15 100644 --- a/src/runtime/Synapse.Runtime.Native/Synapse.Runtime.Native.csproj +++ b/src/runtime/Synapse.Runtime.Native/Synapse.Runtime.Native.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.7 + alpha5.8 $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/tests/Synapse.UnitTests/Cases/Conformance/ConformanceTestsBase.cs b/tests/Synapse.UnitTests/Cases/Conformance/ConformanceTestsBase.cs index 1ad2cd23b..0474a4155 100644 --- a/tests/Synapse.UnitTests/Cases/Conformance/ConformanceTestsBase.cs +++ b/tests/Synapse.UnitTests/Cases/Conformance/ConformanceTestsBase.cs @@ -160,7 +160,7 @@ public async Task When_The_Workflow_Is_Executed_Async() Input = Input } }); - ExecutionContext = ActivatorUtilities.CreateInstance(Services, Definition, Instance); + ExecutionContext = ActivatorUtilities.CreateInstance(Services, Definition, Instance); var executor = ActivatorUtilities.CreateInstance(Services, ExecutionContext); await executor.ExecuteAsync(); } diff --git a/tests/Synapse.UnitTests/Services/MockDocumentApiClient.cs b/tests/Synapse.UnitTests/Services/MockDocumentApiClient.cs index ca3e87f61..9aaf44293 100644 --- a/tests/Synapse.UnitTests/Services/MockDocumentApiClient.cs +++ b/tests/Synapse.UnitTests/Services/MockDocumentApiClient.cs @@ -31,6 +31,6 @@ public async Task UpdateAsync(string id, object content, CancellationToken cance await documents.UpdateAsync(document, cancellationToken); } - public Task DeletesAsync(string id, CancellationToken cancellationToken = default) => documents.RemoveAsync(id, cancellationToken); + public Task DeleteAsync(string id, CancellationToken cancellationToken = default) => documents.RemoveAsync(id, cancellationToken); } diff --git a/tests/Synapse.UnitTests/Services/MockWorkflowExecutionContextFactory.cs b/tests/Synapse.UnitTests/Services/MockWorkflowExecutionContextFactory.cs index 499deb5ee..e6040d2f0 100644 --- a/tests/Synapse.UnitTests/Services/MockWorkflowExecutionContextFactory.cs +++ b/tests/Synapse.UnitTests/Services/MockWorkflowExecutionContextFactory.cs @@ -22,7 +22,7 @@ namespace Synapse.UnitTests.Services; internal static class MockWorkflowExecutionContextFactory { - internal static IWorkflowExecutionContext Create(IServiceProvider services, WorkflowDefinition definition, WorkflowInstance instance) => ActivatorUtilities.CreateInstance(services, definition, instance); + internal static IWorkflowExecutionContext Create(IServiceProvider services, WorkflowDefinition definition, WorkflowInstance instance) => ActivatorUtilities.CreateInstance(services, definition, instance); internal static async Task CreateAsync(IServiceProvider services, WorkflowDefinition? workflowDefinition = null, EquatableDictionary? input = null) {