Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResumeWorkflowSync methods to SyncWorkflowRunner #1289

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/WorkflowCore/Interface/ISyncWorkflowRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ namespace WorkflowCore.Interface
{
public interface ISyncWorkflowRunner
{
Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, TimeSpan timeOut, bool persistSate = true)
Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, TimeSpan timeOut, bool persistState = true)
where TData : new();

Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, CancellationToken token, bool persistSate = true)
Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, CancellationToken token, bool persistState = true)
where TData : new();

Task<WorkflowInstance> ResumeWorkflowSync(string workflowId, TimeSpan timeOut, bool persistState = true);

Task<WorkflowInstance> ResumeWorkflowSync(string workflowId, CancellationToken token, bool persistState = true);
}
}
37 changes: 27 additions & 10 deletions src/WorkflowCore/Services/SyncWorkflowRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public SyncWorkflowRunner(IWorkflowHost host, IWorkflowExecutor executor, IDistr
}

public Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data,
string reference, TimeSpan timeOut, bool persistSate = true)
string reference, TimeSpan timeOut, bool persistState = true)
where TData : new()
{
return RunWorkflowSync(workflowId, version, data, reference, new CancellationTokenSource(timeOut).Token,
persistSate);
persistState);
}

public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, CancellationToken token, bool persistSate = true)
public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, CancellationToken token, bool persistState = true)
where TData : new()
{
var def = _registry.GetDefinition(workflowId, version);
Expand Down Expand Up @@ -71,14 +71,31 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in

var id = Guid.NewGuid().ToString();

if (persistSate)
if (persistState)
id = await _persistenceStore.CreateNewWorkflow(wf, token);
else
wf.Id = id;

return await RunWorkflowInstanceSync(wf, token, persistState);
}

public Task<WorkflowInstance> ResumeWorkflowSync(string workflowId, TimeSpan timeOut, bool persistState = true)
{
return ResumeWorkflowSync(workflowId, new CancellationTokenSource(timeOut).Token, persistState);
}

public async Task<WorkflowInstance> ResumeWorkflowSync(string workflowId, CancellationToken token, bool persistState = true)
{
WorkflowInstance wf = await _persistenceStore.GetWorkflowInstance(workflowId);

return await RunWorkflowInstanceSync(wf, token, persistState);
}

private async Task<WorkflowInstance> RunWorkflowInstanceSync(WorkflowInstance wf, CancellationToken token, bool persistState)
{
wf.Status = WorkflowStatus.Runnable;
if (!await _lockService.AcquireLock(id, CancellationToken.None))

if (!await _lockService.AcquireLock(wf.Id, CancellationToken.None))
{
throw new InvalidOperationException();
}
Expand All @@ -88,17 +105,17 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
while ((wf.Status == WorkflowStatus.Runnable) && !token.IsCancellationRequested)
{
await _executor.Execute(wf, token);
if (persistSate)
if (persistState)
await _persistenceStore.PersistWorkflow(wf, token);
}
}
finally
{
await _lockService.ReleaseLock(id);
await _lockService.ReleaseLock(wf.Id);
}

if (persistSate)
await _queueService.QueueWork(id, QueueType.Index);
if (persistState)
await _queueService.QueueWork(wf.Id, QueueType.Index);

return wf;
}
Expand Down