diff --git a/build.gradle b/build.gradle index ef6ae0d62..09f92e3b0 100644 --- a/build.gradle +++ b/build.gradle @@ -37,7 +37,7 @@ googleJavaFormat { } group = 'com.uber.cadence' -version = '1.0.4' +version = '1.0.5' description = """Uber Cadence Java Client""" diff --git a/src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java b/src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java index a3a3d299b..9916cc5fe 100644 --- a/src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java +++ b/src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java @@ -291,12 +291,12 @@ public void processSignal(String signalName, byte[] input, long eventId) { signalMethod.invoke(workflow, args); } catch (IllegalAccessException e) { throw new Error("Failure processing \"" + signalName + "\" at eventID " + eventId, e); - } catch (DataConverterException e){ + } catch (DataConverterException e) { logSerializationException(signalName, eventId, e); } catch (InvocationTargetException e) { Throwable targetException = e.getTargetException(); if (targetException instanceof DataConverterException) { - logSerializationException(signalName, eventId, (DataConverterException)targetException); + logSerializationException(signalName, eventId, (DataConverterException) targetException); } else { throw new Error( "Failure processing \"" + signalName + "\" at eventID " + eventId, targetException); @@ -305,14 +305,15 @@ public void processSignal(String signalName, byte[] input, long eventId) { } } - void logSerializationException(String signalName, Long eventId, DataConverterException exception){ + void logSerializationException( + String signalName, Long eventId, DataConverterException exception) { log.error( - "Failure deserializing signal input for \"" - + signalName - + "\" at eventID " - + eventId - + ". Dropping it.", - exception); + "Failure deserializing signal input for \"" + + signalName + + "\" at eventID " + + eventId + + ". Dropping it.", + exception); metricsScope.counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1); } diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index c49402483..26bf5421c 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -112,22 +112,23 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) { @Override public Worker newWorker(String taskList) { - return newWorker(taskList, x->x); + return newWorker(taskList, x -> x); } @Override - public Worker newWorker(String taskList, Function overrideOptions) { + public Worker newWorker( + String taskList, Function overrideOptions) { WorkerOptions.Builder builder = - new WorkerOptions.Builder() - .setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory()) - .setMetricsScope(testEnvironmentOptions.getMetricsScope()) - .setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay()); + new WorkerOptions.Builder() + .setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory()) + .setMetricsScope(testEnvironmentOptions.getMetricsScope()) + .setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay()); if (testEnvironmentOptions.getDataConverter() != null) { builder.setDataConverter(testEnvironmentOptions.getDataConverter()); } builder = overrideOptions.apply(builder); Worker result = - new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build()); + new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build()); workers.add(result); return result; } diff --git a/src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java b/src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java index fb2aa97da..d9999d57c 100644 --- a/src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java +++ b/src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java @@ -113,7 +113,8 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) { * @param taskList task list to poll. * @param overrideOptions is used to override the default worker options. */ - Worker newWorker(String taskList, Function overrideOptions); + Worker newWorker( + String taskList, Function overrideOptions); /** Creates a WorkflowClient that is connected to the in-memory test Cadence service. */ WorkflowClient newWorkflowClient(); diff --git a/src/test/java/com/uber/cadence/workflow/MetricsTest.java b/src/test/java/com/uber/cadence/workflow/MetricsTest.java index 3dc3703b9..58644e6c9 100644 --- a/src/test/java/com/uber/cadence/workflow/MetricsTest.java +++ b/src/test/java/com/uber/cadence/workflow/MetricsTest.java @@ -39,7 +39,6 @@ import java.time.Duration; import java.util.Map; import java.util.function.Function; - import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestWatcher; @@ -116,10 +115,12 @@ public interface ReceiveSignalObjectChildWorkflow { void close(); } - public static class ReceiveSignalObjectChildWorkflowImpl implements ReceiveSignalObjectChildWorkflow { + public static class ReceiveSignalObjectChildWorkflowImpl + implements ReceiveSignalObjectChildWorkflow { private String receivedSignal = "Initial State"; // Keep workflow open so that we can send signal CompletablePromise promise = Workflow.newPromise(); + @Override public String execute() { promise.get(); @@ -147,7 +148,7 @@ public static class SendSignalObjectWorkflowImpl implements SendSignalObjectWork @Override public String execute() { ReceiveSignalObjectChildWorkflow child = - Workflow.newChildWorkflowStub(ReceiveSignalObjectChildWorkflow.class); + Workflow.newChildWorkflowStub(ReceiveSignalObjectChildWorkflow.class); Promise greeting = Async.function(child::execute); Signal sig = new Signal(); sig.value = "Hello World"; @@ -162,15 +163,12 @@ public static class Signal { public String value; } - public void setUp(com.uber.m3.util.Duration reportingFrequecy){ + public void setUp(com.uber.m3.util.Duration reportingFrequecy) { reporter = mock(StatsReporter.class); - Scope scope = - new RootScopeBuilder() - .reporter(reporter) - .reportEvery(reportingFrequecy); + Scope scope = new RootScopeBuilder().reporter(reporter).reportEvery(reportingFrequecy); TestEnvironmentOptions testOptions = - new Builder().setDomain(WorkflowTest.DOMAIN).setMetricsScope(scope).build(); + new Builder().setDomain(WorkflowTest.DOMAIN).setMetricsScope(scope).build(); testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); } @@ -222,45 +220,52 @@ public void testWorkflowMetrics() throws InterruptedException { public void testCorruptedSignalMetrics() throws InterruptedException { setUp(com.uber.m3.util.Duration.ofMillis(300)); - Worker worker = testEnvironment.newWorker(taskList, builder -> - builder.setInterceptorFactory(new CorruptedSignalWorkflowInterceptorFactory())); + Worker worker = + testEnvironment.newWorker( + taskList, + builder -> + builder.setInterceptorFactory(new CorruptedSignalWorkflowInterceptorFactory())); worker.registerWorkflowImplementationTypes( - SendSignalObjectWorkflowImpl.class, ReceiveSignalObjectChildWorkflowImpl.class); + SendSignalObjectWorkflowImpl.class, ReceiveSignalObjectChildWorkflowImpl.class); worker.start(); WorkflowOptions options = - new WorkflowOptions.Builder() - .setExecutionStartToCloseTimeout(Duration.ofSeconds(1000)) - .setTaskList(taskList) - .build(); + new WorkflowOptions.Builder() + .setExecutionStartToCloseTimeout(Duration.ofSeconds(1000)) + .setTaskList(taskList) + .build(); WorkflowClient workflowClient = testEnvironment.newWorkflowClient(); - SendSignalObjectWorkflow workflow = workflowClient.newWorkflowStub(SendSignalObjectWorkflow.class, options); + SendSignalObjectWorkflow workflow = + workflowClient.newWorkflowStub(SendSignalObjectWorkflow.class, options); workflow.execute(); - //Wait for reporter + // Wait for reporter Thread.sleep(600); Map tags = - new ImmutableMap.Builder(2) - .put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN) - .put(MetricsTag.TASK_LIST, taskList) - .build(); + new ImmutableMap.Builder(2) + .put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN) + .put(MetricsTag.TASK_LIST, taskList) + .build(); verify(reporter, times(1)).reportCounter(MetricsType.CORRUPTED_SIGNALS_COUNTER, tags, 2); } private static class CorruptedSignalWorkflowInterceptorFactory - implements Function { - - @Override - public WorkflowInterceptor apply(WorkflowInterceptor next) { - return new SignalWorkflowInterceptor(args -> { - if(args != null && args.length > 0){ - return new Object [] {"Corrupted Signal"}; - } - return args; - }, sig->sig, next); - } + implements Function { + + @Override + public WorkflowInterceptor apply(WorkflowInterceptor next) { + return new SignalWorkflowInterceptor( + args -> { + if (args != null && args.length > 0) { + return new Object[] {"Corrupted Signal"}; + } + return args; + }, + sig -> sig, + next); + } } } diff --git a/src/test/java/com/uber/cadence/workflow/interceptors/SignalWorkflowInterceptor.java b/src/test/java/com/uber/cadence/workflow/interceptors/SignalWorkflowInterceptor.java index d3570aca0..b4e29836a 100644 --- a/src/test/java/com/uber/cadence/workflow/interceptors/SignalWorkflowInterceptor.java +++ b/src/test/java/com/uber/cadence/workflow/interceptors/SignalWorkflowInterceptor.java @@ -20,7 +20,6 @@ import com.uber.cadence.WorkflowExecution; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.workflow.*; - import java.lang.reflect.Type; import java.time.Duration; import java.util.Objects; @@ -33,95 +32,113 @@ public class SignalWorkflowInterceptor implements WorkflowInterceptor { - private Function overrideArgs; - private Function overrideSignalName; - private final WorkflowInterceptor next; - - public SignalWorkflowInterceptor(Function overrideArgs, - Function overrideSignalName, - WorkflowInterceptor next) { - this.overrideArgs = overrideArgs; - this.overrideSignalName = overrideSignalName; - this.next = Objects.requireNonNull(next); - } - - @Override - public Promise executeActivity(String activityName, Class resultClass, Type resultType, Object[] args, ActivityOptions options) { - return next.executeActivity(activityName, resultClass, resultType, args, options); - } - - @Override - public WorkflowResult executeChildWorkflow(String workflowType, Class resultClass, Type resultType, Object[] args, ChildWorkflowOptions options) { - return next.executeChildWorkflow(workflowType, resultClass, resultType, args, options); - } - - @Override - public Random newRandom() { - return next.newRandom(); - } - - @Override - public Promise signalExternalWorkflow( - WorkflowExecution execution, String signalName, Object[] args) { - if (args != null && args.length > 0) { - args = new Object[]{"corrupted signal"}; - } - return next.signalExternalWorkflow(execution, overrideSignalName.apply(signalName), overrideArgs.apply(args)); - } - - @Override - public Promise cancelWorkflow(WorkflowExecution execution) { - return next.cancelWorkflow(execution); - } - - @Override - public void sleep(Duration duration) { - next.sleep(duration); - } - - @Override - public boolean await(Duration timeout, String reason, Supplier unblockCondition) { - return next.await(timeout, reason, unblockCondition); - } - - @Override - public void await(String reason, Supplier unblockCondition) { - next.await(reason, unblockCondition); - } - - @Override - public Promise newTimer(Duration duration) { - return next.newTimer(duration); - } - - @Override - public R sideEffect(Class resultClass, Type resultType, Functions.Func func) { - return next.sideEffect(resultClass, resultType, func); - } - - @Override - public R mutableSideEffect(String id, Class resultClass, Type resultType, BiPredicate updated, Functions.Func func) { - return null; - } - - @Override - public int getVersion(String changeID, int minSupported, int maxSupported) { - return next.getVersion(changeID, minSupported, maxSupported); - } - - @Override - public void continueAsNew( - Optional workflowType, Optional options, Object[] args) { - next.continueAsNew(workflowType, options, args); - } - - @Override - public void registerQuery(String queryType, Type[] argTypes, Functions.Func1 callback) { - next.registerQuery(queryType, argTypes, callback); - } - - @Override - public UUID randomUUID() { - return next.randomUUID(); - } + private Function overrideArgs; + private Function overrideSignalName; + private final WorkflowInterceptor next; + + public SignalWorkflowInterceptor( + Function overrideArgs, + Function overrideSignalName, + WorkflowInterceptor next) { + this.overrideArgs = overrideArgs; + this.overrideSignalName = overrideSignalName; + this.next = Objects.requireNonNull(next); + } + + @Override + public Promise executeActivity( + String activityName, + Class resultClass, + Type resultType, + Object[] args, + ActivityOptions options) { + return next.executeActivity(activityName, resultClass, resultType, args, options); + } + + @Override + public WorkflowResult executeChildWorkflow( + String workflowType, + Class resultClass, + Type resultType, + Object[] args, + ChildWorkflowOptions options) { + return next.executeChildWorkflow(workflowType, resultClass, resultType, args, options); + } + + @Override + public Random newRandom() { + return next.newRandom(); + } + + @Override + public Promise signalExternalWorkflow( + WorkflowExecution execution, String signalName, Object[] args) { + if (args != null && args.length > 0) { + args = new Object[] {"corrupted signal"}; + } + return next.signalExternalWorkflow( + execution, overrideSignalName.apply(signalName), overrideArgs.apply(args)); + } + + @Override + public Promise cancelWorkflow(WorkflowExecution execution) { + return next.cancelWorkflow(execution); + } + + @Override + public void sleep(Duration duration) { + next.sleep(duration); + } + + @Override + public boolean await(Duration timeout, String reason, Supplier unblockCondition) { + return next.await(timeout, reason, unblockCondition); + } + + @Override + public void await(String reason, Supplier unblockCondition) { + next.await(reason, unblockCondition); + } + + @Override + public Promise newTimer(Duration duration) { + return next.newTimer(duration); + } + + @Override + public R sideEffect(Class resultClass, Type resultType, Functions.Func func) { + return next.sideEffect(resultClass, resultType, func); + } + + @Override + public R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Functions.Func func) { + return null; + } + + @Override + public int getVersion(String changeID, int minSupported, int maxSupported) { + return next.getVersion(changeID, minSupported, maxSupported); + } + + @Override + public void continueAsNew( + Optional workflowType, Optional options, Object[] args) { + next.continueAsNew(workflowType, options, args); + } + + @Override + public void registerQuery( + String queryType, Type[] argTypes, Functions.Func1 callback) { + next.registerQuery(queryType, argTypes, callback); + } + + @Override + public UUID randomUUID() { + return next.randomUUID(); + } }