Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Charles d'Avernas <[email protected]>
  • Loading branch information
cdavernas committed Jan 7, 2025
1 parent e680ab7 commit 86535bc
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
using Neuroglia.Serialization;
using ServerlessWorkflow.Sdk;
using System.Net;
using System.Xml.Schema;
using System.Xml;
using Avro.Generic;
using System.Xml.Schema;

namespace Synapse.Core.Infrastructure.Services;

Expand Down
5 changes: 5 additions & 0 deletions src/runner/Synapse.Runner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// limitations under the License.

using Moq;
using Neuroglia.AsyncApi;
using Neuroglia.AsyncApi.Client;
using Neuroglia.AsyncApi.Client.Bindings;
using Neuroglia.Serialization.Xml;
using NReco.Logging.File;
using ServerlessWorkflow.Sdk.IO;
Expand Down Expand Up @@ -97,6 +100,8 @@
services.AddServerlessWorkflowIO();
services.AddNodeJSScriptExecutor();
services.AddPythonScriptExecutor();
services.AddAsyncApi();
services.AddAsyncApiClient(options => options.AddAllBindingHandlers());
services.AddSingleton<SecretsManager>();
services.AddSingleton<ISecretsManager>(provider => provider.GetRequiredService<SecretsManager>());
services.AddSingleton<IHostedService>(provider => provider.GetRequiredService<SecretsManager>());
Expand Down
214 changes: 214 additions & 0 deletions src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia;
using Neuroglia.AsyncApi;
using Neuroglia.AsyncApi.Client;
using Neuroglia.AsyncApi.Client.Services;
using Neuroglia.AsyncApi.IO;
using Neuroglia.AsyncApi.v3;
using Neuroglia.Data.Expressions;

namespace Synapse.Runner.Services.Executors;

/// <summary>
/// Represents an <see cref="ITaskExecutor"/> used to execute AsyncAPI <see cref="CallTaskDefinition"/>s using an <see cref="HttpClient"/>
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="logger">The service used to perform logging</param>
/// <param name="executionContextFactory">The service used to create <see cref="ITaskExecutionContext"/>s</param>
/// <param name="executorFactory">The service used to create <see cref="ITaskExecutor"/>s</param>
/// <param name="context">The current <see cref="ITaskExecutionContext"/></param>
/// <param name="schemaHandlerProvider">The service used to provide <see cref="Core.Infrastructure.Services.ISchemaHandler"/> implementations</param>
/// <param name="serializer">The service used to serialize/deserialize objects to/from JSON</param>
/// <param name="httpClientFactory">The service used to create <see cref="HttpClient"/>s</param>
/// <param name="asyncApiDocumentReader">The service used to read <see cref="IAsyncApiDocument"/>s</param>
/// <param name="asyncApiClientFactory">The service used to create <see cref="IAsyncApiClient"/>s</param>
public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger<AsyncApiCallExecutor> logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory,
ITaskExecutionContext<CallTaskDefinition> context, Core.Infrastructure.Services.ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer, IHttpClientFactory httpClientFactory, IAsyncApiDocumentReader asyncApiDocumentReader, IAsyncApiClientFactory asyncApiClientFactory)
: TaskExecutor<CallTaskDefinition>(serviceProvider, logger, executionContextFactory, executorFactory, context, schemaHandlerProvider, serializer)
{

/// <summary>
/// Gets the service used to create <see cref="HttpClient"/>s
/// </summary>
protected IHttpClientFactory HttpClientFactory { get; } = httpClientFactory;

/// <summary>
/// Gets the service used to read <see cref="IAsyncApiDocument"/>s
/// </summary>
protected IAsyncApiDocumentReader AsyncApiDocumentReader { get; } = asyncApiDocumentReader;

/// <summary>
/// Gets the service used to create <see cref="IAsyncApiClient"/>s
/// </summary>
protected IAsyncApiClientFactory AsyncApiClientFactory { get; } = asyncApiClientFactory;

/// <summary>
/// Gets the definition of the AsyncAPI call to perform
/// </summary>
protected AsyncApiCallDefinition? AsyncApi { get; set; }

/// <summary>
/// Gets/sets the <see cref="IAsyncApiDocument"/> that defines the AsyncAPI operation to call
/// </summary>
protected V3AsyncApiDocument? Document { get; set; }

/// <summary>
/// Gets the <see cref="V3OperationDefinition"/> to call
/// </summary>
protected KeyValuePair<string, V3OperationDefinition> Operation { get; set; }

/// <summary>
/// Gets an object used to describe the credentials, if any, used to authenticate a user agent with the AsyncAPI application
/// </summary>
protected AuthorizationInfo? Authorization { get; set; }

/// <summary>
/// Gets/sets the payload, if any, of the message to publish, in case the <see cref="Operation"/>'s <see cref="V3OperationDefinition.Action"/> has been set to <see cref="V3OperationAction.Receive"/>
/// </summary>
protected object? MessagePayload { get; set; }

/// <summary>
/// Gets/sets the headers, if any, of the message to publish, in case the <see cref="Operation"/>'s <see cref="V3OperationDefinition.Action"/> has been set to <see cref="V3OperationAction.Receive"/>
/// </summary>
protected object? MessageHeaders { get; set; }

/// <inheritdoc/>
protected override async Task DoInitializeAsync(CancellationToken cancellationToken)
{
this.AsyncApi = (AsyncApiCallDefinition)this.JsonSerializer.Convert(this.Task.Definition.With, typeof(AsyncApiCallDefinition))!;
using var httpClient = this.HttpClientFactory.CreateClient();
await httpClient.ConfigureAuthenticationAsync(this.AsyncApi.Document.Endpoint.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false);
var uriString = StringFormatter.NamedFormat(this.AsyncApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary());
if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync<string>(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The AsyncAPI endpoint URI cannot be null or empty");
if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI");
using var request = new HttpRequestMessage(HttpMethod.Get, uriString);
using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Failed to retrieve the AsyncAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'.", uri, response.StatusCode);
this.Logger.LogDebug("Response content:\r\n{responseContent}", responseContent ?? "None");
response.EnsureSuccessStatusCode();
}
using var responseStream = await response.Content!.ReadAsStreamAsync(cancellationToken)!;
var document = await this.AsyncApiDocumentReader.ReadAsync(responseStream, cancellationToken).ConfigureAwait(false);
if (document is not V3AsyncApiDocument v3Document) throw new NotSupportedException("Synapse only supports AsyncAPI v3.0.0 at the moment");
this.Document = v3Document;
var operationId = this.AsyncApi.OperationRef;
if (operationId.IsRuntimeExpression()) operationId = await this.Task.Workflow.Expressions.EvaluateAsync<string>(operationId, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
if (string.IsNullOrWhiteSpace(operationId)) throw new NullReferenceException("The operation ref cannot be null or empty");
var operation = this.Document.Operations.FirstOrDefault(o => o.Key == operationId);
if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{operationId}' in AsyncAPI document at '{uri}'");
if (this.AsyncApi.Authentication != null) this.Authorization = await AuthorizationInfo.CreateAsync(this.AsyncApi.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false);
switch (this.Operation.Value.Action)
{
case V3OperationAction.Receive:
await this.BuildMessagePayloadAsync(cancellationToken).ConfigureAwait(false);
await this.BuildMessageHeadersAsync(cancellationToken).ConfigureAwait(false);
break;
case V3OperationAction.Send:

break;
default:
throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported");
}
}

/// <summary>
/// Builds the payload, if any, of the message to publish, in case the <see cref="Operation"/>'s <see cref="V3OperationDefinition.Action"/> has been set to <see cref="V3OperationAction.Receive"/>
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancellationToken = default)
{
if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution");

Check failure on line 137 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

Operator '==' cannot be applied to operands of type 'KeyValuePair<string, V3OperationDefinition>' and '<null>'

Check failure on line 137 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

Operator '==' cannot be applied to operands of type 'KeyValuePair<string, V3OperationDefinition>' and '<null>'
if (this.Task.Input == null) this.MessagePayload = new { };
if (this.AsyncApi.Payload == null) return;
var arguments = this.GetExpressionEvaluationArguments();
if (this.Authorization != null)
{
arguments ??= new Dictionary<string, object>();
arguments.Add("authorization", this.Authorization);
}
this.MessagePayload = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.AsyncApi.Payload, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Builds the headers, if any, of the message to publish, in case the <see cref="Operation"/>'s <see cref="V3OperationDefinition.Action"/> has been set to <see cref="V3OperationAction.Receive"/>
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task BuildMessageHeadersAsync(CancellationToken cancellationToken = default)
{
if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution");

Check failure on line 156 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

Operator '==' cannot be applied to operands of type 'KeyValuePair<string, V3OperationDefinition>' and '<null>'

Check failure on line 156 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

Operator '==' cannot be applied to operands of type 'KeyValuePair<string, V3OperationDefinition>' and '<null>'
if (this.AsyncApi.Headers == null) return;

Check failure on line 157 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

'AsyncApiCallDefinition' does not contain a definition for 'Headers' and no accessible extension method 'Headers' accepting a first argument of type 'AsyncApiCallDefinition' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 157 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

'AsyncApiCallDefinition' does not contain a definition for 'Headers' and no accessible extension method 'Headers' accepting a first argument of type 'AsyncApiCallDefinition' could be found (are you missing a using directive or an assembly reference?)
var arguments = this.GetExpressionEvaluationArguments();
if (this.Authorization != null)
{
arguments ??= new Dictionary<string, object>();
arguments.Add("authorization", this.Authorization);
}
this.MessageHeaders = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.AsyncApi.Headers, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false);

Check failure on line 164 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

'AsyncApiCallDefinition' does not contain a definition for 'Headers' and no accessible extension method 'Headers' accepting a first argument of type 'AsyncApiCallDefinition' could be found (are you missing a using directive or an assembly reference?)
}

/// <inheritdoc/>
protected override Task DoExecuteAsync(CancellationToken cancellationToken)
{
if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
switch (this.Operation.Value.Action)
{
case V3OperationAction.Receive:
return this.DoExecutePublishOperationAsync(cancellationToken);
case V3OperationAction.Send:
return this.DoExecuteSubscribeOperationAsync(cancellationToken);
default:
throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported");
}
}

/// <summary>
/// Executes an AsyncAPI publish operation
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task DoExecutePublishOperationAsync(CancellationToken cancellationToken)
{
if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document);
var parameters = new AsyncApiPublishOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol)

Check failure on line 191 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

'AsyncApiCallDefinition' does not contain a definition for 'Protocol' and no accessible extension method 'Protocol' accepting a first argument of type 'AsyncApiCallDefinition' could be found (are you missing a using directive or an assembly reference?)
{
Payload = this.MessagePayload,
Headers = this.MessageHeaders
};
await using var result = await asyncApiClient.PublishAsync(parameters, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI publish operation");
}

/// <summary>
/// Executes an AsyncAPI subscribe operation
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken cancellationToken)
{
if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document);
var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol);

Check failure on line 209 in src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

'AsyncApiCallDefinition' does not contain a definition for 'Protocol' and no accessible extension method 'Protocol' accepting a first argument of type 'AsyncApiCallDefinition' could be found (are you missing a using directive or an assembly reference?)
await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace Synapse.Runner.Services.Executors;

/// <summary>
/// Represents an <see cref="ITaskExecutor"/> used to execute http <see cref="CallTaskDefinition"/>s using an <see cref="HttpClient"/>
/// Represents an <see cref="ITaskExecutor"/> used to execute OpenAPI <see cref="CallTaskDefinition"/>s using an <see cref="HttpClient"/>
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="logger">The service used to perform logging</param>
Expand Down Expand Up @@ -111,7 +111,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
await httpClient.ConfigureAuthenticationAsync(this.OpenApi.Document.Endpoint.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false);
var uriString = StringFormatter.NamedFormat(this.OpenApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary());
if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync<string>(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The OpenAPI endpoint URI cannot be null or whitespace");
if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The OpenAPI endpoint URI cannot be null or empty");
if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI");
using var request = new HttpRequestMessage(HttpMethod.Get, uriString);
using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -287,4 +287,4 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
}

}
}
1 change: 1 addition & 0 deletions src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected virtual ITaskExecutor CreateCallTaskExecutor(IServiceProvider serviceP
ArgumentNullException.ThrowIfNull(context);
return context.Definition.Call switch
{
Function.AsyncApi => ActivatorUtilities.CreateInstance<AsyncApiCallExecutor>(serviceProvider, context),
Function.Grpc => ActivatorUtilities.CreateInstance<GrpcCallExecutor>(serviceProvider, context),
Function.Http => ActivatorUtilities.CreateInstance<HttpCallExecutor>(serviceProvider, context),
Function.OpenApi => ActivatorUtilities.CreateInstance<OpenApiCallExecutor>(serviceProvider, context),
Expand Down
1 change: 0 additions & 1 deletion src/runner/Synapse.Runner/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented;
using ServerlessWorkflow.Sdk.Models;

namespace Synapse.Runner.Services;

Expand Down
2 changes: 2 additions & 0 deletions src/runner/Synapse.Runner/Synapse.Runner.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="MimeKit" Version="4.8.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="Neuroglia.AsyncApi.Client.Bindings.All" Version="3.0.1" />
<PackageReference Include="Neuroglia.AsyncApi.DependencyInjectionExtensions" Version="3.0.1" />
<PackageReference Include="Neuroglia.Data.Expressions.JavaScript" Version="4.18.1" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.18.1" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents.Infrastructure" Version="4.18.1" />
Expand Down

0 comments on commit 86535bc

Please sign in to comment.