From 86535bc9d642cdfee1c390fb7cca9e4428a2bf32 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Tue, 7 Jan 2025 17:31:03 +0100 Subject: [PATCH] Initial commit Signed-off-by: Charles d'Avernas --- .../Services/XmlSchemaHandler.cs | 3 +- src/runner/Synapse.Runner/Program.cs | 5 + .../Executors/AsyncApiCallExecutor.cs | 214 ++++++++++++++++++ .../Services/Executors/OpenApiCallExecutor.cs | 6 +- .../Services/TaskExecutorFactory.cs | 1 + .../Services/WorkflowExecutor.cs | 1 - .../Synapse.Runner/Synapse.Runner.csproj | 2 + 7 files changed, 226 insertions(+), 6 deletions(-) create mode 100644 src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs diff --git a/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs b/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs index 41f919bba..6fc558d33 100644 --- a/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs +++ b/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs @@ -15,9 +15,8 @@ using Neuroglia.Serialization; using ServerlessWorkflow.Sdk; using System.Net; -using System.Xml.Schema; using System.Xml; -using Avro.Generic; +using System.Xml.Schema; namespace Synapse.Core.Infrastructure.Services; diff --git a/src/runner/Synapse.Runner/Program.cs b/src/runner/Synapse.Runner/Program.cs index 292bd6632..48662b4a6 100644 --- a/src/runner/Synapse.Runner/Program.cs +++ b/src/runner/Synapse.Runner/Program.cs @@ -12,6 +12,9 @@ // limitations under the License. using Moq; +using Neuroglia.AsyncApi; +using Neuroglia.AsyncApi.Client; +using Neuroglia.AsyncApi.Client.Bindings; using Neuroglia.Serialization.Xml; using NReco.Logging.File; using ServerlessWorkflow.Sdk.IO; @@ -97,6 +100,8 @@ services.AddServerlessWorkflowIO(); services.AddNodeJSScriptExecutor(); services.AddPythonScriptExecutor(); + services.AddAsyncApi(); + services.AddAsyncApiClient(options => options.AddAllBindingHandlers()); services.AddSingleton(); services.AddSingleton(provider => provider.GetRequiredService()); services.AddSingleton(provider => provider.GetRequiredService()); diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs new file mode 100644 index 000000000..3d09ff2d8 --- /dev/null +++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs @@ -0,0 +1,214 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia; +using Neuroglia.AsyncApi; +using Neuroglia.AsyncApi.Client; +using Neuroglia.AsyncApi.Client.Services; +using Neuroglia.AsyncApi.IO; +using Neuroglia.AsyncApi.v3; +using Neuroglia.Data.Expressions; + +namespace Synapse.Runner.Services.Executors; + +/// +/// Represents an used to execute AsyncAPI s using an +/// +/// The current +/// The service used to perform logging +/// The service used to create s +/// The service used to create s +/// The current +/// The service used to provide implementations +/// The service used to serialize/deserialize objects to/from JSON +/// The service used to create s +/// The service used to read s +/// The service used to create s +public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory, + ITaskExecutionContext context, Core.Infrastructure.Services.ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer, IHttpClientFactory httpClientFactory, IAsyncApiDocumentReader asyncApiDocumentReader, IAsyncApiClientFactory asyncApiClientFactory) + : TaskExecutor(serviceProvider, logger, executionContextFactory, executorFactory, context, schemaHandlerProvider, serializer) +{ + + /// + /// Gets the service used to create s + /// + protected IHttpClientFactory HttpClientFactory { get; } = httpClientFactory; + + /// + /// Gets the service used to read s + /// + protected IAsyncApiDocumentReader AsyncApiDocumentReader { get; } = asyncApiDocumentReader; + + /// + /// Gets the service used to create s + /// + protected IAsyncApiClientFactory AsyncApiClientFactory { get; } = asyncApiClientFactory; + + /// + /// Gets the definition of the AsyncAPI call to perform + /// + protected AsyncApiCallDefinition? AsyncApi { get; set; } + + /// + /// Gets/sets the that defines the AsyncAPI operation to call + /// + protected V3AsyncApiDocument? Document { get; set; } + + /// + /// Gets the to call + /// + protected KeyValuePair Operation { get; set; } + + /// + /// Gets an object used to describe the credentials, if any, used to authenticate a user agent with the AsyncAPI application + /// + protected AuthorizationInfo? Authorization { get; set; } + + /// + /// Gets/sets the payload, if any, of the message to publish, in case the 's has been set to + /// + protected object? MessagePayload { get; set; } + + /// + /// Gets/sets the headers, if any, of the message to publish, in case the 's has been set to + /// + protected object? MessageHeaders { get; set; } + + /// + protected override async Task DoInitializeAsync(CancellationToken cancellationToken) + { + this.AsyncApi = (AsyncApiCallDefinition)this.JsonSerializer.Convert(this.Task.Definition.With, typeof(AsyncApiCallDefinition))!; + using var httpClient = this.HttpClientFactory.CreateClient(); + await httpClient.ConfigureAuthenticationAsync(this.AsyncApi.Document.Endpoint.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false); + var uriString = StringFormatter.NamedFormat(this.AsyncApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary()); + if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The AsyncAPI endpoint URI cannot be null or empty"); + if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI"); + using var request = new HttpRequestMessage(HttpMethod.Get, uriString); + using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); + if (!response.IsSuccessStatusCode) + { + var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Failed to retrieve the AsyncAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'.", uri, response.StatusCode); + this.Logger.LogDebug("Response content:\r\n{responseContent}", responseContent ?? "None"); + response.EnsureSuccessStatusCode(); + } + using var responseStream = await response.Content!.ReadAsStreamAsync(cancellationToken)!; + var document = await this.AsyncApiDocumentReader.ReadAsync(responseStream, cancellationToken).ConfigureAwait(false); + if (document is not V3AsyncApiDocument v3Document) throw new NotSupportedException("Synapse only supports AsyncAPI v3.0.0 at the moment"); + this.Document = v3Document; + var operationId = this.AsyncApi.OperationRef; + if (operationId.IsRuntimeExpression()) operationId = await this.Task.Workflow.Expressions.EvaluateAsync(operationId, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(operationId)) throw new NullReferenceException("The operation ref cannot be null or empty"); + var operation = this.Document.Operations.FirstOrDefault(o => o.Key == operationId); + if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{operationId}' in AsyncAPI document at '{uri}'"); + if (this.AsyncApi.Authentication != null) this.Authorization = await AuthorizationInfo.CreateAsync(this.AsyncApi.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false); + switch (this.Operation.Value.Action) + { + case V3OperationAction.Receive: + await this.BuildMessagePayloadAsync(cancellationToken).ConfigureAwait(false); + await this.BuildMessageHeadersAsync(cancellationToken).ConfigureAwait(false); + break; + case V3OperationAction.Send: + + break; + default: + throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"); + } + } + + /// + /// Builds the payload, if any, of the message to publish, in case the 's has been set to + /// + /// A + /// A new awaitable + protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancellationToken = default) + { + if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution"); + if (this.Task.Input == null) this.MessagePayload = new { }; + if (this.AsyncApi.Payload == null) return; + var arguments = this.GetExpressionEvaluationArguments(); + if (this.Authorization != null) + { + arguments ??= new Dictionary(); + arguments.Add("authorization", this.Authorization); + } + this.MessagePayload = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Payload, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false); + } + + /// + /// Builds the headers, if any, of the message to publish, in case the 's has been set to + /// + /// A + /// A new awaitable + protected virtual async Task BuildMessageHeadersAsync(CancellationToken cancellationToken = default) + { + if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution"); + if (this.AsyncApi.Headers == null) return; + var arguments = this.GetExpressionEvaluationArguments(); + if (this.Authorization != null) + { + arguments ??= new Dictionary(); + arguments.Add("authorization", this.Authorization); + } + this.MessageHeaders = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Headers, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false); + } + + /// + protected override Task DoExecuteAsync(CancellationToken cancellationToken) + { + if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + switch (this.Operation.Value.Action) + { + case V3OperationAction.Receive: + return this.DoExecutePublishOperationAsync(cancellationToken); + case V3OperationAction.Send: + return this.DoExecuteSubscribeOperationAsync(cancellationToken); + default: + throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"); + } + } + + /// + /// Executes an AsyncAPI publish operation + /// + /// A + /// A new awaitable + protected virtual async Task DoExecutePublishOperationAsync(CancellationToken cancellationToken) + { + if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document); + var parameters = new AsyncApiPublishOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol) + { + Payload = this.MessagePayload, + Headers = this.MessageHeaders + }; + await using var result = await asyncApiClient.PublishAsync(parameters, cancellationToken).ConfigureAwait(false); + if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI publish operation"); + } + + /// + /// Executes an AsyncAPI subscribe operation + /// + /// A + /// A new awaitable + protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken cancellationToken) + { + if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document); + var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol); + await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false); + if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation"); + } + +} \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs index 8536661c6..179de2078 100644 --- a/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs @@ -22,7 +22,7 @@ namespace Synapse.Runner.Services.Executors; /// -/// Represents an used to execute http s using an +/// Represents an used to execute OpenAPI s using an /// /// The current /// The service used to perform logging @@ -111,7 +111,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo await httpClient.ConfigureAuthenticationAsync(this.OpenApi.Document.Endpoint.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false); var uriString = StringFormatter.NamedFormat(this.OpenApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary()); if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); - if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The OpenAPI endpoint URI cannot be null or whitespace"); + if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The OpenAPI endpoint URI cannot be null or empty"); if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI"); using var request = new HttpRequestMessage(HttpMethod.Get, uriString); using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); @@ -287,4 +287,4 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } -} \ No newline at end of file +} diff --git a/src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs b/src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs index 7804050fb..524a068bc 100644 --- a/src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs +++ b/src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs @@ -67,6 +67,7 @@ protected virtual ITaskExecutor CreateCallTaskExecutor(IServiceProvider serviceP ArgumentNullException.ThrowIfNull(context); return context.Definition.Call switch { + Function.AsyncApi => ActivatorUtilities.CreateInstance(serviceProvider, context), Function.Grpc => ActivatorUtilities.CreateInstance(serviceProvider, context), Function.Http => ActivatorUtilities.CreateInstance(serviceProvider, context), Function.OpenApi => ActivatorUtilities.CreateInstance(serviceProvider, context), diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs index 26c3948dc..4fb81d77f 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs @@ -12,7 +12,6 @@ // limitations under the License. using Neuroglia.Data.Infrastructure.ResourceOriented; -using ServerlessWorkflow.Sdk.Models; namespace Synapse.Runner.Services; diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index 10b855089..bd007de3a 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -61,6 +61,8 @@ + +