Skip to content

Commit

Permalink
fix(Runner): Fixed the TryTaskExecutor by updating context data aft…
Browse files Browse the repository at this point in the history
…er running nested `try` and `catch` tasks

fix(Api): Fixed the cluster and namespaced resource controllers to handle cancellation-related exceptions when enumerating SSE events
fix(Operator): Fixed the `WorkflowController` not to throw when version compared version lists have no diff patch operations
fix(Operator): Fixed the `WorkflowInstanceController` to delete all related documents

Signed-off-by: Charles d'Avernas <[email protected]>
  • Loading branch information
cdavernas committed Dec 5, 2024
1 parent 7fd2983 commit 3e04a16
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 32 deletions.
24 changes: 16 additions & 8 deletions src/api/Synapse.Api.Http/ClusterResourceController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,16 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string? labelSel
this.Response.Headers.CacheControl = "no-cache";
this.Response.Headers.Connection = "keep-alive";
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
try
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
return this.Ok();
}

Expand Down Expand Up @@ -149,12 +153,16 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, Ca
this.Response.Headers.CacheControl = "no-cache";
this.Response.Headers.Connection = "keep-alive";
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
try
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
return this.Ok();
}

Expand Down
26 changes: 16 additions & 10 deletions src/api/Synapse.Api.Http/NamespacedResourceController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented;

namespace Synapse.Api.Http;

/// <summary>
Expand Down Expand Up @@ -164,12 +162,16 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string @namespac
this.Response.Headers.CacheControl = "no-cache";
this.Response.Headers.Connection = "keep-alive";
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
try
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when(ex is TaskCanceledException || ex is OperationCanceledException) { }
return this.Ok();
}

Expand Down Expand Up @@ -206,12 +208,16 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, st
this.Response.Headers.CacheControl = "no-cache";
this.Response.Headers.Connection = "keep-alive";
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach(var e in response.Data!.WithCancellation(cancellationToken))
try
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
return this.Ok();
}

Expand Down
3 changes: 2 additions & 1 deletion src/operator/Synapse.Operator/Services/WorkflowController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public override async Task StartAsync(CancellationToken cancellationToken)
this.Operator!.Select(b => b.Resource.Spec.Selector).DistinctUntilChanged().SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken);
this.Where(e => e.Type == ResourceWatchEventType.Updated)
.Select(e => new { Workflow = e.Resource, e.Resource.Spec.Versions })
.DistinctUntilChanged()
.DistinctUntilChanged(s => s.Versions)
.Scan((Previous: (EquatableList<WorkflowDefinition>)null!, Current: (EquatableList<WorkflowDefinition>)null!, Workflow: (Workflow)null!), (accumulator, current) => (accumulator.Current, current.Versions, current.Workflow))
.SubscribeAsync(async value => await this.OnWorkflowVersionChangedAsync(value.Workflow, value.Previous, value.Current).ConfigureAwait(false), cancellationToken: cancellationToken);
await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false);
Expand Down Expand Up @@ -190,6 +190,7 @@ protected virtual async Task OnWorkflowVersionChangedAsync(Workflow workflow, Eq
if (workflow.Metadata.Labels == null || !workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out _)) if (!await this.TryClaimAsync(workflow, this.CancellationTokenSource.Token).ConfigureAwait(false)) return;
if (workflow.Metadata.Labels?[SynapseDefaults.Resources.Labels.Operator] != this.Operator.Resource.GetQualifiedName()) return;
var diffPatch = JsonPatchUtility.CreateJsonPatchFromDiff(previous, current);
if (diffPatch.Operations.Count < 1) return;
var operation = diffPatch.Operations[0].Op;
if (this.Schedulers.TryRemove(workflow.GetQualifiedName(), out var scheduler)) await scheduler.DisposeAsync().ConfigureAwait(false);
if (operation == OperationType.Remove) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.Services;

namespace Synapse.Operator.Services;

/// <summary>
Expand All @@ -21,7 +23,8 @@ namespace Synapse.Operator.Services;
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController)
/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IRepository<Document, string> documents)
: ResourceController<WorkflowInstance>(loggerFactory, controllerOptions, repository)
{

Expand All @@ -35,6 +38,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
/// </summary>
protected IResourceMonitor<Resources.Operator> Operator => operatorController.Operator;

/// <summary>
/// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
/// </summary>
protected IRepository<Document, string> Documents => documents;

/// <summary>
/// Gets a <see cref="ConcurrentDictionary{TKey, TValue}"/> that contains current <see cref="WorkflowInstanceHandler"/>es
/// </summary>
Expand Down Expand Up @@ -139,24 +147,54 @@ public override async Task StopAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
protected override async Task OnResourceCreatedAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default)
{
await base.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
if (!await this.TryClaimAsync(workflowInstance, cancellationToken).ConfigureAwait(false)) return;
var handler = await this.CreateWorkflowInstanceHandlerAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
await handler.HandleAsync(cancellationToken).ConfigureAwait(false);
try
{
await base.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
if (!await this.TryClaimAsync(workflowInstance, cancellationToken).ConfigureAwait(false)) return;
var handler = await this.CreateWorkflowInstanceHandlerAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
await handler.HandleAsync(cancellationToken).ConfigureAwait(false);
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
}
}

/// <inheritdoc/>
protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default)
{
await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
var selectors = new LabelSelector[]
try
{
await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
var selectors = new LabelSelector[]
{
new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
};
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
};
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
{
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
}
if (workflowInstance.Status != null)
{
var documentReferences = new List<string>();
if (!string.IsNullOrWhiteSpace(workflowInstance.Status.ContextReference)) documentReferences.Add(workflowInstance.Status.ContextReference);
if (!string.IsNullOrWhiteSpace(workflowInstance.Status.OutputReference)) documentReferences.Add(workflowInstance.Status.OutputReference);
if (workflowInstance.Status.Tasks != null)
{
foreach (var task in workflowInstance.Status.Tasks)
{
if (!string.IsNullOrWhiteSpace(task.ContextReference)) documentReferences.Add(task.ContextReference);
if (!string.IsNullOrWhiteSpace(task.InputReference)) documentReferences.Add(task.InputReference);
if (!string.IsNullOrWhiteSpace(task.OutputReference)) documentReferences.Add(task.OutputReference);
}
}
foreach (var documentReference in documentReferences.Distinct()) await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
}
}
catch(Exception ex)
{
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
this.Logger.LogError("An error occured while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/Synapse.Operator/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
},
"Runtime": {
"Native": {
"Directory": "..\\..\\..\\..\\..\\runner\\Synapse.Runner\\bin\\Debug\\net8.0\\",
"Directory": "..\\..\\..\\..\\..\\runner\\Synapse.Runner\\bin\\Debug\\net9.0\\",
"Executable": "Synapse.Runner.exe"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ protected virtual async Task OnTryFaultedAsync(ITaskExecutor executor, Exception
protected virtual async Task OnTryCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(executor);
if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false);
var last = executor.Task.Instance;
var output = executor.Task.Output!;
this.Executors.Remove(executor);
Expand Down Expand Up @@ -188,6 +189,7 @@ protected virtual async Task OnHandlerFaultAsync(ITaskExecutor executor, Excepti
protected virtual async Task OnHandlerCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(executor);
if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false);
var last = executor.Task.Instance;
var output = executor.Task.Output!;
this.Executors.Remove(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected override void ConfigureServices(IServiceCollection services)
};
options.Runtime.Native = new()
{
Directory = Path.Combine("..", "..", "..", "..", "..", "src", "runner", "Synapse.Runner", "bin", "Debug", "net8.0"),
Directory = Path.Combine("..", "..", "..", "..", "..", "src", "runner", "Synapse.Runner", "bin", "Debug", "net9.0"),
Executable = "Synapse.Runner"
};
});
Expand Down

0 comments on commit 3e04a16

Please sign in to comment.