Skip to content

Commit

Permalink
feat(Runner): Implement the AsyncApiCallExecutor
Browse files Browse the repository at this point in the history
Signed-off-by: Charles d'Avernas <[email protected]>
  • Loading branch information
cdavernas committed Jan 10, 2025
1 parent 86535bc commit 076ce93
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="9.0.0" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.2" />
<PackageReference Include="System.Reactive" Version="6.0.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/api/Synapse.Api.Http/Synapse.Api.Http.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<ItemGroup>
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.18.1" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.18.1" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.1.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.2.0" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<PackageReference Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="9.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.1.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.2.0" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/cli/Synapse.Cli/Synapse.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
<PackageReference Include="moment.net" Version="1.3.4" />
<PackageReference Include="NetEscapades.Configuration.Yaml" Version="3.1.0" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.2" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
<PackageReference Include="System.CommandLine.NamingConventionBinder" Version="2.0.0-beta4.22272.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<PackageReference Include="Neuroglia.Mediation" Version="4.18.1" />
<PackageReference Include="Neuroglia.Plugins" Version="4.18.1" />
<PackageReference Include="Neuroglia.Serialization.Xml" Version="4.18.1" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.2" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Synapse.Core/Synapse.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.18.1" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.18.1" />
<PackageReference Include="Semver" Version="3.0.0" />
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha5.2" />
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha6.2" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.18.1" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents.Infrastructure" Version="4.18.1" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.18.1" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.1.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.0.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.2.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.2.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,15 @@ protected INodeViewModel BuildTaskNode(TaskNodeRenderingContext context)
protected virtual NodeViewModel BuildCallTaskNode(TaskNodeRenderingContext<CallTaskDefinition> context)
{
ArgumentNullException.ThrowIfNull(context);
var content = string.Empty;
string content; ;
string callType;
switch (context.TaskDefinition.Call.ToLower())
{
case "asyncapi":
{
var definition = (AsyncApiCallDefinition)this.JsonSerializer.Convert(context.TaskDefinition.With, typeof(AsyncApiCallDefinition))!;
callType = context.TaskDefinition.Call.ToLower();
content = definition.OperationRef;
content = definition.Operation!;
break;
}
case "grpc":
Expand Down
2 changes: 1 addition & 1 deletion src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<ItemGroup>
<PackageReference Include="Blazor.Bootstrap" Version="3.2.0" />
<PackageReference Include="BlazorMonaco" Version="3.2.0" />
<PackageReference Include="BlazorMonaco" Version="3.3.0" />
<PackageReference Include="IdentityModel" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Authentication" Version="9.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,21 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
var document = await this.AsyncApiDocumentReader.ReadAsync(responseStream, cancellationToken).ConfigureAwait(false);
if (document is not V3AsyncApiDocument v3Document) throw new NotSupportedException("Synapse only supports AsyncAPI v3.0.0 at the moment");
this.Document = v3Document;
var operationId = this.AsyncApi.OperationRef;
if (string.IsNullOrWhiteSpace(this.AsyncApi.Operation)) throw new NullReferenceException("The 'operation' parameter must be set when performing an AsyncAPI v3 call");
var operationId = this.AsyncApi.Operation;
if (operationId.IsRuntimeExpression()) operationId = await this.Task.Workflow.Expressions.EvaluateAsync<string>(operationId, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
if (string.IsNullOrWhiteSpace(operationId)) throw new NullReferenceException("The operation ref cannot be null or empty");
var operation = this.Document.Operations.FirstOrDefault(o => o.Key == operationId);
if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{operationId}' in AsyncAPI document at '{uri}'");
this.Operation = this.Document.Operations.FirstOrDefault(o => o.Key == operationId);
if (this.Operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{operationId}' in AsyncAPI document at '{uri}'");
if (this.AsyncApi.Authentication != null) this.Authorization = await AuthorizationInfo.CreateAsync(this.AsyncApi.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false);
switch (this.Operation.Value.Action)
{
case V3OperationAction.Receive:
await this.BuildMessagePayloadAsync(cancellationToken).ConfigureAwait(false);
await this.BuildMessageHeadersAsync(cancellationToken).ConfigureAwait(false);
break;
case V3OperationAction.Send:

break;
default:
throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported");
case V3OperationAction.Send: break;
default: throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported");
}
}

Expand All @@ -134,16 +132,16 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancellationToken = default)
{
if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution");
if (this.AsyncApi == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
if (this.Task.Input == null) this.MessagePayload = new { };
if (this.AsyncApi.Payload == null) return;
if (this.AsyncApi.Message?.Payload == null) return;
var arguments = this.GetExpressionEvaluationArguments();
if (this.Authorization != null)
{
arguments ??= new Dictionary<string, object>();
arguments.Add("authorization", this.Authorization);
}
this.MessagePayload = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.AsyncApi.Payload, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false);
this.MessagePayload = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.AsyncApi.Message.Payload, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -153,30 +151,27 @@ protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancella
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task BuildMessageHeadersAsync(CancellationToken cancellationToken = default)
{
if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution");
if (this.AsyncApi.Headers == null) return;
if (this.AsyncApi == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
if (this.AsyncApi.Message?.Headers == null) return;
var arguments = this.GetExpressionEvaluationArguments();
if (this.Authorization != null)
{
arguments ??= new Dictionary<string, object>();
arguments.Add("authorization", this.Authorization);
}
this.MessageHeaders = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.AsyncApi.Headers, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false);
this.MessageHeaders = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.AsyncApi.Message.Headers, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
protected override Task DoExecuteAsync(CancellationToken cancellationToken)
{
if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
switch (this.Operation.Value.Action)
return this.Operation.Value.Action switch
{
case V3OperationAction.Receive:
return this.DoExecutePublishOperationAsync(cancellationToken);
case V3OperationAction.Send:
return this.DoExecuteSubscribeOperationAsync(cancellationToken);
default:
throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported");
}
V3OperationAction.Receive => this.DoExecutePublishOperationAsync(cancellationToken),
V3OperationAction.Send => this.DoExecuteSubscribeOperationAsync(cancellationToken),
_ => throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"),
};
}

/// <summary>
Expand All @@ -195,6 +190,7 @@ protected virtual async Task DoExecutePublishOperationAsync(CancellationToken ca
};
await using var result = await asyncApiClient.PublishAsync(parameters, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI publish operation");
await this.SetResultAsync(null, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -205,10 +201,31 @@ protected virtual async Task DoExecutePublishOperationAsync(CancellationToken ca
protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken cancellationToken)
{
if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation");
await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document);
var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol);
await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation");
if(result.Messages == null)
{
await this.SetResultAsync(null, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
}
var observable = result.Messages;
if (this.AsyncApi.Subscription.Consume.For != null) observable = observable.TakeUntil(Observable.Timer(this.AsyncApi.Subscription.Consume.For.ToTimeSpan()));
if (this.AsyncApi.Subscription.Consume.Amount.HasValue) observable = observable.Take(this.AsyncApi.Subscription.Consume.Amount.Value);
else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) observable = observable.Select(message => Observable.FromAsync(async () =>
{
var keepGoing = await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!,this.GetExpressionEvaluationArguments(),cancellationToken).ConfigureAwait(false);
return (message, keepGoing);
})).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) observable = observable.Select(message => Observable.FromAsync(async () =>
{
var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false));
return (message, keepGoing);
})).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false);
await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected virtual async Task<TaskDefinition> GetCustomFunctionAsync(string funct
if (components.Length != 2) throw new Exception($"The specified value '{functionName}' is not a valid custom function qualified name ({{name}}:{{version}})");
var name = components[0];
var version = components[1];
if (!SemVersion.TryParse(version, SemVersionStyles.Strict, out _)) throw new Exception($"The specified value '{version}' is not a valid semantic version 2.0");
if (!Semver.SemVersion.TryParse(version, SemVersionStyles.Strict, out _)) throw new Exception($"The specified value '{version}' is not a valid semantic version 2.0");
if (catalogName == SynapseDefaults.Tasks.CustomFunctions.Catalogs.Default)
{
var function = await this.Task.Workflow.CustomFunctions.GetAsync(name, cancellationToken).ConfigureAwait(false) ?? throw new NullReferenceException($"Failed to find the specified custom function '{name}'");
Expand Down
6 changes: 3 additions & 3 deletions src/runner/Synapse.Runner/Synapse.Runner.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@
<ItemGroup>
<PackageReference Include="Docker.DotNet" Version="3.125.15" />
<PackageReference Include="DynamicGrpc" Version="1.4.0" />
<PackageReference Include="Google.Protobuf" Version="3.29.1" />
<PackageReference Include="Google.Protobuf" Version="3.29.3" />
<PackageReference Include="Grpc.Core" Version="2.46.6" />
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.OpenApi.Readers" Version="1.6.22" />
<PackageReference Include="Microsoft.OpenApi.Readers" Version="1.6.23" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="MimeKit" Version="4.8.0" />
<PackageReference Include="MimeKit" Version="4.9.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="Neuroglia.AsyncApi.Client.Bindings.All" Version="3.0.1" />
<PackageReference Include="Neuroglia.AsyncApi.DependencyInjectionExtensions" Version="3.0.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="coverlet.collector" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="FluentAssertions" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha6.2" />
<PackageReference Include="Testcontainers" Version="4.1.0" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
10 changes: 5 additions & 5 deletions tests/Synapse.UnitTests/Synapse.UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="coverlet.collector" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand All @@ -22,14 +22,14 @@
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.18.1" />
<PackageReference Include="Neuroglia.Data.Infrastructure.Memory" Version="4.18.1" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.18.1" />
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha6.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.2" />
<PackageReference Include="System.Net.Http" Version="4.3.4" />
<PackageReference Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageReference Include="Testcontainers" Version="4.1.0" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="Xunit.Gherkin.Quick" Version="4.5.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down

0 comments on commit 076ce93

Please sign in to comment.